From 6899223e4296841d80dcbbb09fee6f5f4e42131c Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Wed, 1 Nov 2023 13:39:05 +1100 Subject: [PATCH] feat: bump `@matrixai/rpc` to `0.2.6` and moved `timeoutMiddleware` from `src/client` to `@matrixai/rpc` chore: lintfix [ci-skip] --- package-lock.json | 8 +- package.json | 2 +- src/PolykeyClient.ts | 2 +- src/client/ClientService.ts | 2 +- src/client/index.ts | 1 - src/client/middleware.ts | 31 +-- src/client/timeoutMiddleware.ts | 104 --------- src/client/types.ts | 30 +-- src/nodes/NodeConnectionManager.ts | 2 +- src/nodes/agent/handlers/VaultsGitInfoGet.ts | 4 +- src/nodes/agent/handlers/VaultsGitPackGet.ts | 7 +- src/nodes/agent/types.ts | 37 ++-- tests/client/timeoutMiddleware.test.ts | 213 ------------------- 13 files changed, 50 insertions(+), 393 deletions(-) delete mode 100644 src/client/timeoutMiddleware.ts delete mode 100644 tests/client/timeoutMiddleware.test.ts diff --git a/package-lock.json b/package-lock.json index 2f1274192e..e6e795d7e0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,7 +21,7 @@ "@matrixai/mdns": "^1.2.3", "@matrixai/quic": "^1.0.0", "@matrixai/resources": "^1.1.5", - "@matrixai/rpc": "^0.2.4", + "@matrixai/rpc": "^0.2.6", "@matrixai/timer": "^1.1.2", "@matrixai/workers": "^1.3.7", "@matrixai/ws": "^1.1.7", @@ -1637,9 +1637,9 @@ "integrity": "sha512-m/DEZEe3wHqWEPTyoBtzFF6U9vWYhEnQtGgwvqiAlTxTM0rk96UBpWjDZCTF/vYG11ZlmlQFtg5H+zGgbjaB3Q==" }, "node_modules/@matrixai/rpc": { - "version": "0.2.4", - "resolved": "https://registry.npmjs.org/@matrixai/rpc/-/rpc-0.2.4.tgz", - "integrity": "sha512-OvIjAE00aYuufyeMbpMD+QXMAqrLVHNHMOU8kEGTiOg4fKoPi95/taD4jNvRsPFtvxY3SsHaRfkI9zN6Lf6iUA==", + "version": "0.2.6", + "resolved": "https://registry.npmjs.org/@matrixai/rpc/-/rpc-0.2.6.tgz", + "integrity": "sha512-FLd/w1vLSYYFBMpaQ2VG4haUyWugD9/CkSLPusMz/mfY/Q2gGmtXb7LfuQhsAqasU6n8iA+7fEF3u3T3fLeLBA==", "dependencies": { "@matrixai/async-init": "^1.9.4", "@matrixai/contexts": "^1.2.0", diff --git a/package.json b/package.json index 0b77717f41..54fb51bc5d 100644 --- a/package.json +++ b/package.json @@ -77,7 +77,7 @@ "@matrixai/mdns": "^1.2.3", "@matrixai/quic": "^1.0.0", "@matrixai/resources": "^1.1.5", - "@matrixai/rpc": "^0.2.4", + "@matrixai/rpc": "^0.2.6", "@matrixai/timer": "^1.1.2", "@matrixai/workers": "^1.3.7", "@matrixai/ws": "^1.1.7", diff --git a/src/PolykeyClient.ts b/src/PolykeyClient.ts index ccd7531bf9..95c3fe4d9e 100644 --- a/src/PolykeyClient.ts +++ b/src/PolykeyClient.ts @@ -297,7 +297,7 @@ class PolykeyClient { optionsDefaulted.rpcParserBufferSize, ), toError: networkUtils.toError, - streamKeepAliveTimeoutTime: optionsDefaulted.rpcCallTimeoutTime, + timeoutTime: optionsDefaulted.rpcCallTimeoutTime, logger: this.logger.getChild(RPCClient.name), }); this._nodeId = nodeId_; diff --git a/src/client/ClientService.ts b/src/client/ClientService.ts index a193ac2c85..9d48514e73 100644 --- a/src/client/ClientService.ts +++ b/src/client/ClientService.ts @@ -83,7 +83,7 @@ class ClientService { this.logger = logger ?? new Logger(this.constructor.name); this.rpcServer = new RPCServer({ idGen, - handlerTimeoutTime: rpcCallTimeoutTime, + timeoutTime: rpcCallTimeoutTime, middlewareFactory: rpcMiddleware.defaultServerMiddlewareWrapper( middlewareFactory, rpcParserBufferSize, diff --git a/src/client/index.ts b/src/client/index.ts index 5bd24a8c3a..c339d6e97e 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -5,6 +5,5 @@ export * as handlers from './handlers'; export * as utils from './utils'; export * as middleware from './middleware'; export * as authenticationMiddleware from './authenticationMiddleware'; -export * as timeoutMiddleWare from './timeoutMiddleware'; export * as errors from './errors'; export * as types from './types'; diff --git a/src/client/middleware.ts b/src/client/middleware.ts index 06b1795642..e556e840d5 100644 --- a/src/client/middleware.ts +++ b/src/client/middleware.ts @@ -8,7 +8,6 @@ import type { import type SessionManager from '../sessions/SessionManager'; import type KeyRing from '../keys/KeyRing'; import * as authenticationMiddlewareUtils from './authenticationMiddleware'; -import * as timeoutMiddlewareUtils from './timeoutMiddleware'; function middlewareServer( sessionManager: SessionManager, @@ -26,24 +25,15 @@ function middlewareServer( ); return (ctx, cancel, meta) => { const authMiddleware = authMiddlewareFactory(ctx, cancel, meta); - const timeoutMiddleware = timeoutMiddlewareUtils.timeoutMiddlewareServer( - ctx, - cancel, - meta, - ); // Order is auth -> timeout return { forward: { writable: authMiddleware.forward.writable, - readable: authMiddleware.forward.readable.pipeThrough( - timeoutMiddleware.forward, - ), + readable: authMiddleware.forward.readable, }, reverse: { - writable: timeoutMiddleware.reverse.writable, - readable: timeoutMiddleware.reverse.readable.pipeThrough( - authMiddleware.reverse, - ), + writable: authMiddleware.reverse.writable, + readable: authMiddleware.reverse.readable, }, }; }; @@ -61,24 +51,15 @@ function middlewareClient( authenticationMiddlewareUtils.authenticationMiddlewareClient(session); return (ctx, cancel, meta) => { const authMiddleware = authMiddlewareFactory(ctx, cancel, meta); - const timeoutMiddleware = timeoutMiddlewareUtils.timeoutMiddlewareClient( - ctx, - cancel, - meta, - ); // Order is timeout -> auth return { forward: { - writable: timeoutMiddleware.forward.writable, - readable: timeoutMiddleware.forward.readable.pipeThrough( - authMiddleware.forward, - ), + writable: authMiddleware.forward.writable, + readable: authMiddleware.forward.readable, }, reverse: { writable: authMiddleware.reverse.writable, - readable: authMiddleware.reverse.readable.pipeThrough( - timeoutMiddleware.reverse, - ), + readable: authMiddleware.reverse.readable, }, }; }; diff --git a/src/client/timeoutMiddleware.ts b/src/client/timeoutMiddleware.ts deleted file mode 100644 index a66198962a..0000000000 --- a/src/client/timeoutMiddleware.ts +++ /dev/null @@ -1,104 +0,0 @@ -import type { ContextTimed } from '@matrixai/contexts'; -import type { JSONRPCRequest, JSONRPCResponse } from '@matrixai/rpc'; -import type { ClientRPCRequestParams, ClientRPCResponseResult } from './types'; -import type { JSONValue } from '@matrixai/rpc'; -import { TransformStream } from 'stream/web'; - -/** - * This adds its timeout to the reverse metadata and updates it's timeout based - * on the forward metadata. - */ -function timeoutMiddlewareServer( - ctx: ContextTimed, - _cancel: (reason?: any) => void, - _meta: Record | undefined, -) { - const currentTimeout = ctx.timer.delay; - // Flags for tracking if the first message has been processed - let forwardFirst = true; - let reverseFirst = true; - return { - forward: new TransformStream< - JSONRPCRequest, - JSONRPCRequest - >({ - transform: (chunk, controller) => { - controller.enqueue(chunk); - if (forwardFirst) { - forwardFirst = false; - const clientTimeout = chunk.params?.metadata?.timeout; - - if (clientTimeout == null) return; - if (clientTimeout < currentTimeout) ctx.timer.reset(clientTimeout); - } - }, - }), - reverse: new TransformStream< - JSONRPCResponse, - JSONRPCResponse - >({ - transform: (chunk, controller) => { - if (reverseFirst) { - reverseFirst = false; - if ('result' in chunk) { - if (chunk.result.metadata == null) chunk.result.metadata = {}; - chunk.result.metadata.timeout = currentTimeout; - } - } - controller.enqueue(chunk); - }, - }), - }; -} - -/** - * This adds its own timeout to the forward metadata and updates it's timeout - * based on the reverse metadata. - * @param ctx - * @param _cancel - * @param _meta - */ -function timeoutMiddlewareClient( - ctx: ContextTimed, - _cancel: (reason?: any) => void, - _meta: Record | undefined, -) { - const currentTimeout = ctx.timer.delay; - // Flags for tracking if the first message has been processed - let forwardFirst = true; - let reverseFirst = true; - return { - forward: new TransformStream< - JSONRPCRequest, - JSONRPCRequest - >({ - transform: (chunk, controller) => { - if (forwardFirst) { - forwardFirst = false; - if (chunk.params == null) chunk.params = {}; - if (chunk.params.metadata == null) chunk.params.metadata = {}; - chunk.params.metadata.timeout = currentTimeout; - } - controller.enqueue(chunk); - }, - }), - reverse: new TransformStream< - JSONRPCResponse, - JSONRPCResponse - >({ - transform: (chunk, controller) => { - controller.enqueue(chunk); - if (reverseFirst) { - reverseFirst = false; - if ('result' in chunk) { - const clientTimeout = chunk.result?.metadata?.timeout; - if (clientTimeout == null) return; - if (clientTimeout < currentTimeout) ctx.timer.reset(clientTimeout); - } - } - }, - }), - }; -} - -export { timeoutMiddlewareServer, timeoutMiddlewareClient }; diff --git a/src/client/types.ts b/src/client/types.ts index d154d36d8a..56e4bf08b4 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -1,4 +1,9 @@ -import type { JSONValue, ObjectEmpty } from '../types'; +import type { + JSONObject, + JSONValue, + JSONRPCParams, + JSONRPCResult, +} from '@matrixai/rpc'; import type { GestaltIdEncoded, IdentityId, @@ -18,27 +23,22 @@ import type { Notification } from '../notifications/types'; import type { ProviderToken } from '../identities/types'; // Prevent overwriting the metadata type with `Omit<>` -type ClientRPCRequestParams = ObjectEmpty> = - { - metadata?: { - [Key: string]: JSONValue; - } & Partial<{ - authorization: string; - timeout: number; - }>; - } & Omit; +type ClientRPCRequestParams = { + metadata?: { + [Key: string]: JSONValue; + } & Partial<{ + authorization: string; + }>; +} & JSONRPCParams; // Prevent overwriting the metadata type with `Omit<>` -type ClientRPCResponseResult< - T extends Record = ObjectEmpty, -> = { +type ClientRPCResponseResult = { metadata?: { [Key: string]: JSONValue; } & Partial<{ authorization: string; - timeout: number; }>; -} & Omit; +} & JSONRPCResult; type StatusResultMessage = { pid: number; diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 113a93b534..7ae5fe7367 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -404,7 +404,7 @@ class NodeConnectionManager { this.rpcParserBufferSize, ), fromError: networkUtils.fromError, - handlerTimeoutTime: this.rpcCallTimeoutTime, + timeoutTime: this.rpcCallTimeoutTime, logger: this.logger.getChild(RPCServer.name), }); diff --git a/src/nodes/agent/handlers/VaultsGitInfoGet.ts b/src/nodes/agent/handlers/VaultsGitInfoGet.ts index ed9e32f876..0c820129b1 100644 --- a/src/nodes/agent/handlers/VaultsGitInfoGet.ts +++ b/src/nodes/agent/handlers/VaultsGitInfoGet.ts @@ -1,10 +1,10 @@ import type { DB } from '@matrixai/db'; import type Logger from '@matrixai/logger'; +import type { JSONObject, JSONRPCRequest } from '@matrixai/rpc'; import type { ContextTimed } from '@matrixai/contexts'; import type ACL from '../../../acl/ACL'; import type VaultManager from '../../../vaults/VaultManager'; import type { JSONValue } from '../../../types'; -import type { JSONRPCRequest } from '@matrixai/rpc'; import { ReadableStream } from 'stream/web'; import { RawHandler } from '@matrixai/rpc'; import * as agentErrors from '../errors'; @@ -28,7 +28,7 @@ class VaultsGitInfoGet extends RawHandler<{ _cancel, meta: Record | undefined, _ctx: ContextTimed, // TODO: use - ): Promise<[JSONValue, ReadableStream]> => { + ): Promise<[JSONObject, ReadableStream]> => { const { db, vaultManager, acl } = this.container; const [headerMessage, inputStream] = input; await inputStream.cancel(); diff --git a/src/nodes/agent/handlers/VaultsGitPackGet.ts b/src/nodes/agent/handlers/VaultsGitPackGet.ts index b6a0472edf..9fdec6f82f 100644 --- a/src/nodes/agent/handlers/VaultsGitPackGet.ts +++ b/src/nodes/agent/handlers/VaultsGitPackGet.ts @@ -1,10 +1,9 @@ import type { DB } from '@matrixai/db'; +import type { JSONObject, JSONRPCRequest } from '@matrixai/rpc'; import type { PassThrough } from 'readable-stream'; import type { VaultName } from '../../../vaults/types'; import type ACL from '../../../acl/ACL'; import type VaultManager from '../../../vaults/VaultManager'; -import type { JSONValue } from '../../../types'; -import type { JSONRPCRequest } from '@matrixai/rpc'; import { ReadableStream } from 'stream/web'; import { RawHandler } from '@matrixai/rpc'; import * as agentErrors from '../errors'; @@ -26,7 +25,7 @@ class VaultsGitPackGet extends RawHandler<{ input: [JSONRPCRequest, ReadableStream], _cancel, meta, - ): Promise<[JSONValue, ReadableStream]> => { + ): Promise<[JSONObject, ReadableStream]> => { const { vaultManager, acl, db } = this.container; const [headerMessage, inputStream] = input; const requestingNodeId = agentUtils.nodeIdFromMeta(meta); @@ -103,7 +102,7 @@ class VaultsGitPackGet extends RawHandler<{ sideBand.destroy(e); }, }); - return [null, outputStream]; + return [{}, outputStream]; }; } diff --git a/src/nodes/agent/types.ts b/src/nodes/agent/types.ts index a962423964..2aa91905ac 100644 --- a/src/nodes/agent/types.ts +++ b/src/nodes/agent/types.ts @@ -1,30 +1,25 @@ +import type { JSONObject, JSONRPCParams, JSONRPCResult } from '@matrixai/rpc'; import type { SignedTokenEncoded } from '../../tokens/types'; import type { ClaimIdEncoded, NodeIdEncoded, VaultIdEncoded } from '../../ids'; import type { VaultAction, VaultName } from '../../vaults/types'; import type { SignedNotification } from '../../notifications/types'; -import type { JSONValue, ObjectEmpty } from '../../types'; +import type { JSONValue } from '../../types'; -// Prevent overwriting the metadata type with `Omit<>` -type AgentRPCRequestParams = ObjectEmpty> = - { - metadata?: { - [Key: string]: JSONValue; - } & Partial<{ - authorization: string; - timeout: number; - }>; - } & Omit; +type AgentRPCRequestParams = { + metadata?: { + [Key: string]: JSONValue; + } & Partial<{ + authorization: string; + }>; +} & JSONRPCParams; -// Prevent overwriting the metadata type with `Omit<>` -type AgentRPCResponseResult = ObjectEmpty> = - { - metadata?: { - [Key: string]: JSONValue; - } & Partial<{ - authorization: string; - timeout: number; - }>; - } & Omit; +type AgentRPCResponseResult = { + metadata?: { + [Key: string]: JSONValue; + } & Partial<{ + authorization: string; + }>; +} & JSONRPCResult; type ClaimIdMessage = { claimIdEncoded: ClaimIdEncoded; diff --git a/tests/client/timeoutMiddleware.test.ts b/tests/client/timeoutMiddleware.test.ts deleted file mode 100644 index c4b2f2ef01..0000000000 --- a/tests/client/timeoutMiddleware.test.ts +++ /dev/null @@ -1,213 +0,0 @@ -import type { ContextTimed } from '@matrixai/contexts'; -import type { - ClientRPCRequestParams, - ClientRPCResponseResult, -} from '@/client/types'; -import type { TLSConfig } from '@/network/types'; -import fs from 'fs'; -import path from 'path'; -import os from 'os'; -import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; -import { DB } from '@matrixai/db'; -import { Timer } from '@matrixai/timer'; -import { - RPCClient, - UnaryCaller, - UnaryHandler, - middleware as rpcUtilsMiddleware, -} from '@matrixai/rpc'; -import { WebSocketClient } from '@matrixai/ws'; -import KeyRing from '@/keys/KeyRing'; -import TaskManager from '@/tasks/TaskManager'; -import CertManager from '@/keys/CertManager'; -import ClientService from '@/client/ClientService'; -import * as timeoutMiddleware from '@/client/timeoutMiddleware'; -import * as keysUtils from '@/keys/utils'; -import * as networkUtils from '@/network/utils'; -import * as utils from '@/utils'; -import * as testsUtils from '../utils'; - -describe('timeoutMiddleware', () => { - const logger = new Logger('agentUnlock test', LogLevel.WARN, [ - new StreamHandler(), - ]); - const password = 'helloWorld'; - const localhost = '127.0.0.1'; - let dataDir: string; - let db: DB; - let keyRing: KeyRing; - let taskManager: TaskManager; - let certManager: CertManager; - let clientService: ClientService; - let clientClient: WebSocketClient; - let tlsConfig: TLSConfig; - - beforeEach(async () => { - dataDir = await fs.promises.mkdtemp( - path.join(os.tmpdir(), 'polykey-test-'), - ); - const keysPath = path.join(dataDir, 'keys'); - const dbPath = path.join(dataDir, 'db'); - db = await DB.createDB({ - dbPath, - logger, - }); - keyRing = await KeyRing.createKeyRing({ - password, - keysPath, - logger, - passwordOpsLimit: keysUtils.passwordOpsLimits.min, - passwordMemLimit: keysUtils.passwordMemLimits.min, - strictMemoryLock: false, - }); - taskManager = await TaskManager.createTaskManager({ db, logger }); - certManager = await CertManager.createCertManager({ - db, - keyRing, - taskManager, - logger, - }); - tlsConfig = await testsUtils.createTLSConfig(keyRing.keyPair); - }); - afterEach(async () => { - await taskManager.stopProcessing(); - await taskManager.stopTasks(); - await clientService?.stop({ force: true }); - await clientClient?.destroy({ force: true }); - await certManager.stop(); - await taskManager.stop(); - await keyRing.stop(); - await db.stop(); - await fs.promises.rm(dataDir, { - force: true, - recursive: true, - }); - }); - test('server side timeout updates', async () => { - // Setup - const ctxProm = utils.promise(); - class EchoHandler extends UnaryHandler< - { logger: Logger }, - ClientRPCRequestParams, - ClientRPCResponseResult - > { - public handle = async ( - input: ClientRPCRequestParams, - _cancel, - _meta, - ctx, - ) => { - ctxProm.resolveP(ctx); - return input; - }; - } - clientService = new ClientService({ - tlsConfig, - middlewareFactory: timeoutMiddleware.timeoutMiddlewareServer, - logger: logger.getChild(ClientService.name), - }); - await clientService.start({ - manifest: { - testHandler: new EchoHandler({ logger }), - }, - host: localhost, - }); - clientClient = await WebSocketClient.createWebSocketClient({ - config: { - verifyPeer: false, - }, - host: localhost, - port: clientService.port, - logger, - }); - const rpcClient = new RPCClient({ - manifest: { - testHandler: new UnaryCaller< - ClientRPCRequestParams, - ClientRPCResponseResult - >(), - }, - streamFactory: () => clientClient.connection.newStream(), - toError: networkUtils.toError, - middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( - timeoutMiddleware.timeoutMiddlewareClient, - ), - logger, - }); - - // Doing the test - const timer = new Timer({ - delay: 100, - }); - await rpcClient.methods.testHandler({}, { timer }); - - const ctx = await ctxProm.p; - expect(ctx.timer.delay).toBe(100); - timer.cancel(); - await timer.then( - () => {}, - () => {}, - ); - await clientService.stop({ force: true }); - }); - test('client side timeout updates', async () => { - // Setup - class EchoHandler extends UnaryHandler< - { logger: Logger }, - ClientRPCRequestParams, - ClientRPCResponseResult - > { - public handle = async ( - input: ClientRPCRequestParams, - _cancel, - _meta, - _ctx, - ) => { - return input; - }; - } - clientService = new ClientService({ - tlsConfig, - middlewareFactory: timeoutMiddleware.timeoutMiddlewareServer, - rpcCallTimeoutTime: 100, - logger: logger.getChild(ClientService.name), - }); - await clientService.start({ - manifest: { - testHandler: new EchoHandler({ logger }), - }, - host: localhost, - }); - clientClient = await WebSocketClient.createWebSocketClient({ - config: { - verifyPeer: false, - }, - host: localhost, - port: clientService.port, - logger, - }); - const rpcClient = new RPCClient({ - idGen: async () => null, - manifest: { - testHandler: new UnaryCaller< - ClientRPCRequestParams, - ClientRPCResponseResult - >(), - }, - streamFactory: async () => clientClient.connection.newStream(), - toError: networkUtils.toError, - middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( - timeoutMiddleware.timeoutMiddlewareClient, - ), - logger, - }); - - // Doing the test - const timer = new Timer({ - delay: 1000, - }); - await rpcClient.methods.testHandler({}, { timer }); - expect(timer.delay).toBe(100); - timer.cancel(); - }); -});