Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket for Client Service API #506

Merged
merged 23 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
eac6153
feat: creating `ClientServer` implementation
tegefaulkes Feb 14, 2023
c433d92
feat: `ClientServer` supports backpressure
tegefaulkes Feb 17, 2023
0ed77d6
fix: switched `ClientServer` to `StartStop` and other fixes
tegefaulkes Feb 20, 2023
485e4a8
feat: creating `ClientClient` implementation
tegefaulkes Feb 20, 2023
52500ca
feat: client authenticates server
tegefaulkes Feb 21, 2023
eba92b2
tests: re-ordering tests
tegefaulkes Feb 21, 2023
e092e64
tests: tests for abrupt connection ending
tegefaulkes Feb 21, 2023
17cb110
feat: client rejects normal HTTP requests
tegefaulkes Feb 21, 2023
c21ebd1
feat: `ClientClient` connection timeout
tegefaulkes Feb 22, 2023
4ceb657
tests: testing abruptly dropped connections
tegefaulkes Feb 22, 2023
d80ad80
feat: `ClientClient` keepalive and heartbeat
tegefaulkes Feb 23, 2023
2d01f27
feat: `ClientServer` keepalive and heartbeat
tegefaulkes Feb 23, 2023
5da726e
feat: `IPv6` support
tegefaulkes Feb 23, 2023
d096ce2
fix: cleaning up and errors
tegefaulkes Feb 23, 2023
1641c7e
tests: cleaning up and fixing tests
tegefaulkes Feb 24, 2023
d2eea49
feat: stream handler callback provided with `ConnectionInfo`
tegefaulkes Feb 24, 2023
00a2094
fix: changed names of `ClientClient` and `ClientServer` to `WebSocket…
tegefaulkes Feb 27, 2023
0f9cbcd
fix: `WebSocketServer` using protected arrow functions for the `uWebs…
tegefaulkes Feb 27, 2023
4a4427a
feat: `WebSocketClient` now extends the `WebSocketStream` class for i…
tegefaulkes Feb 27, 2023
bf3627b
feat: `WebSocketServer` now extends the `WebSocketStream` class for i…
tegefaulkes Feb 27, 2023
953efbc
fix: separating the `websockets` code to its own domain
tegefaulkes Feb 28, 2023
f260d73
fix: abstracting `uWebsocket` server creation to its own protected me…
tegefaulkes Feb 28, 2023
c6feceb
feat: `WebSocketServer` now extends `EventTarget` with `connection`, …
tegefaulkes Feb 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
"tslib": "^2.4.0",
"tsyringe": "^4.7.0",
"utp-native": "^2.5.3",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.19.0",
"ws": "^8.12.0"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import type {
RawHandlerImplementation,
ServerHandlerImplementation,
UnaryHandlerImplementation,
ConnectionInfo,
} from './types';
import type { ReadableWritablePair } from 'stream/web';
import type { JSONValue } from '../types';
import type { ConnectionInfo } from '../network/types';
import type { RPCErrorEvent } from './utils';
import type { MiddlewareFactory } from './types';
import { ReadableStream } from 'stream/web';
Expand Down
2 changes: 1 addition & 1 deletion src/RPC/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { JSONValue } from 'types';
import type { ContainerType } from 'RPC/types';
import type { ReadableStream } from 'stream/web';
import type { JsonRpcRequest } from 'RPC/types';
import type { ConnectionInfo } from '../network/types';
import type { ConnectionInfo } from './types';
import type { ContextCancellable } from '../contexts/types';

abstract class Handler<
Expand Down
22 changes: 21 additions & 1 deletion src/RPC/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { JSONValue } from '../types';
import type { ConnectionInfo } from '../network/types';
import type { ContextCancellable } from '../contexts/types';
import type { ReadableStream, ReadableWritablePair } from 'stream/web';
import type { Handler } from './handlers';
Expand All @@ -11,6 +10,8 @@ import type {
ClientCaller,
UnaryCaller,
} from './callers';
import type { NodeId } from '../nodes/types';
import type { Certificate } from '../keys/types';

/**
* This is the JSON RPC request object. this is the generic message type used for the RPC.
Expand Down Expand Up @@ -108,6 +109,24 @@ type JsonRpcMessage<T extends JSONValue = JSONValue> =
| JsonRpcRequest<T>
| JsonRpcResponse<T>;

/**
* Proxy connection information
* @property remoteNodeId - NodeId of the remote connecting node
* @property remoteCertificates - Certificate chain of the remote connecting node
* @property localHost - Proxy host of the local connecting node
* @property localPort - Proxy port of the local connecting node
* @property remoteHost - Proxy host of the remote connecting node
* @property remotePort - Proxy port of the remote connecting node
*/
type ConnectionInfo = Partial<{
remoteNodeId: NodeId;
remoteCertificates: Array<Certificate>;
localHost: string;
localPort: number;
remoteHost: string;
remotePort: number;
}>;

// Handler types
type HandlerImplementation<I, O> = (
input: I,
Expand Down Expand Up @@ -218,6 +237,7 @@ export type {
JsonRpcRequest,
JsonRpcResponse,
JsonRpcMessage,
ConnectionInfo,
HandlerImplementation,
RawHandlerImplementation,
DuplexHandlerImplementation,
Expand Down
28 changes: 28 additions & 0 deletions src/clientRPC/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { ErrorPolykey, sysexits } from '../errors';

class ErrorRPC<T> extends ErrorPolykey<T> {}

class ErrorRPCClient<T> extends ErrorRPC<T> {}

class ErrorClientAuthMissing<T> extends ErrorRPCClient<T> {
static description = 'Authorisation metadata is required but missing';
exitCode = sysexits.NOPERM;
}

class ErrorClientAuthFormat<T> extends ErrorRPCClient<T> {
static description = 'Authorisation metadata has invalid format';
exitCode = sysexits.USAGE;
}

class ErrorClientAuthDenied<T> extends ErrorRPCClient<T> {
static description = 'Authorisation metadata is incorrect or expired';
exitCode = sysexits.NOPERM;
}

export {
ErrorRPC,
ErrorRPCClient,
ErrorClientAuthMissing,
ErrorClientAuthFormat,
ErrorClientAuthDenied,
};
220 changes: 6 additions & 214 deletions src/clientRPC/utils.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
import type { SessionToken } from '../sessions/types';
import type KeyRing from '../keys/KeyRing';
import type SessionManager from '../sessions/SessionManager';
import type { RPCRequestParams } from './types';
import type { JsonRpcRequest } from '../RPC/types';
import type { ReadableWritablePair } from 'stream/web';
import type Logger from '@matrixai/logger';
import type { ConnectionInfo, Host, Port } from '../network/types';
import type RPCServer from '../RPC/RPCServer';
import type { TLSSocket } from 'tls';
import type { Server } from 'https';
import type net from 'net';
import type https from 'https';
import { ReadableStream, WritableStream } from 'stream/web';
import WebSocket, { WebSocketServer } from 'ws';
import * as clientErrors from '../client/errors';
import { promise } from '../utils';
import type SessionManager from 'sessions/SessionManager';
import type KeyRing from 'keys/KeyRing';
import type { JsonRpcRequest } from 'RPC/types';
import type { SessionToken } from 'sessions/types';
import * as clientErrors from './errors';

async function authenticate(
sessionManager: SessionManager,
Expand Down Expand Up @@ -65,201 +54,4 @@ function encodeAuthFromPassword(password: string): string {
return `Basic ${encoded}`;
}

function readableFromWebSocket(
ws: WebSocket,
logger: Logger,
): ReadableStream<Uint8Array> {
return new ReadableStream<Uint8Array>({
start: (controller) => {
logger.info('starting');
const messageHandler = (data) => {
logger.debug(`message: ${data.toString()}`);
ws.pause();
const message = data as Buffer;
if (message.length === 0) {
logger.info('ENDING');
ws.removeAllListeners('message');
try {
controller.close();
} catch {
// Ignore already closed
}
return;
}
controller.enqueue(message);
};
ws.on('message', messageHandler);
ws.once('close', () => {
logger.info('closed');
ws.removeListener('message', messageHandler);
try {
controller.close();
} catch {
// Ignore already closed
}
});
ws.once('error', (e) => {
controller.error(e);
});
},
cancel: () => {
logger.info('cancelled');
ws.close();
},
pull: () => {
logger.debug('resuming');
ws.resume();
},
});
}

function writeableFromWebSocket(
ws: WebSocket,
holdOpen: boolean,
logger: Logger,
): WritableStream<Uint8Array> {
return new WritableStream<Uint8Array>({
start: (controller) => {
logger.info('starting');
ws.once('error', (e) => {
logger.error(`error: ${e}`);
controller.error(e);
});
ws.once('close', (code, reason) => {
logger.info(
`ws closing early! with code: ${code} and reason: ${reason.toString()}`,
);
controller.error(Error('TMP WebSocket Closed early'));
});
},
close: () => {
logger.info('stream closing');
ws.send(Buffer.from([]));
if (!holdOpen) ws.terminate();
},
abort: () => {
logger.info('aborting');
ws.close();
},
write: async (chunk, controller) => {
logger.debug(`writing: ${chunk?.toString()}`);
const wait = promise<void>();
ws.send(chunk, (e) => {
if (e != null) {
logger.error(`error: ${e}`);
controller.error(e);
}
wait.resolveP();
});
await wait.p;
},
});
}

function webSocketToWebStreamPair(
ws: WebSocket,
holdOpen: boolean,
logger: Logger,
): ReadableWritablePair<Uint8Array, Uint8Array> {
return {
readable: readableFromWebSocket(ws, logger.getChild('readable')),
writable: writeableFromWebSocket(ws, holdOpen, logger.getChild('writable')),
};
}

function startConnection(
host: string,
port: number,
logger: Logger,
): Promise<ReadableWritablePair<Uint8Array, Uint8Array>> {
const ws = new WebSocket(`wss://${host}:${port}`, {
// CheckServerIdentity: (
// servername: string,
// cert: WebSocket.CertMeta,
// ): boolean => {
// console.log('CHECKING IDENTITY');
// console.log(servername);
// console.log(cert);
// return false;
// },
rejectUnauthorized: false,
// Ca: tlsConfig.certChainPem
});
ws.once('close', () => logger.info('CLOSED'));
// Ws.once('upgrade', () => {
// // Const tlsSocket = request.socket as TLSSocket;
// // Console.log(tlsSocket.getPeerCertificate());
// logger.info('Test early cancellation');
// // Request.destroy(Error('some error'));
// // tlsSocket.destroy(Error('some error'));
// // ws.close(12345, 'some reason');
// // TODO: Use the existing verify method from the GRPC implementation
// // TODO: Have this emit an error on verification failure.
// // It's fine for the server side to close abruptly without error
// });
const prom = promise<ReadableWritablePair<Uint8Array, Uint8Array>>();
ws.once('open', () => {
logger.info('starting connection');
prom.resolveP(webSocketToWebStreamPair(ws, true, logger));
});
return prom.p;
}

function handleConnection(ws: WebSocket, logger: Logger): void {
ws.once('close', () => logger.info('CLOSED'));
const readable = readableFromWebSocket(ws, logger.getChild('readable'));
const writable = writeableFromWebSocket(
ws,
false,
logger.getChild('writable'),
);
void readable.pipeTo(writable).catch((e) => logger.error(e));
}

function createClientServer(
server: Server,
rpcServer: RPCServer,
logger: Logger,
) {
logger.info('created server');
const wss = new WebSocketServer({
server,
});
wss.on('error', (e) => logger.error(e));
logger.info('created wss');
wss.on('connection', (ws, req) => {
logger.info('connection!');
const socket = req.socket as TLSSocket;
const streamPair = webSocketToWebStreamPair(ws, false, logger);
rpcServer.handleStream(streamPair, {
localHost: socket.localAddress! as Host,
localPort: socket.localPort! as Port,
remoteCertificates: socket.getPeerCertificate(),
remoteHost: socket.remoteAddress! as Host,
remotePort: socket.remotePort! as Port,
} as unknown as ConnectionInfo);
});
wss.once('close', () => {
wss.removeAllListeners('error');
wss.removeAllListeners('connection');
});
return wss;
}

async function listen(server: https.Server, host?: string, port?: number) {
await new Promise<void>((resolve) => {
server.listen(port, host ?? '127.0.0.1', undefined, () => resolve());
});
const addressInfo = server.address() as net.AddressInfo;
return addressInfo.port;
}

export {
authenticate,
decodeAuth,
encodeAuthFromPassword,
startConnection,
handleConnection,
createClientServer,
listen,
};
export { authenticate, decodeAuth, encodeAuthFromPassword };
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ interface FileSystem {
readdir: typeof fs.promises.readdir;
rename: typeof fs.promises.rename;
open: typeof fs.promises.open;
mkdtemp: typeof fs.promises.mkdtemp;
};
constants: typeof fs.constants;
}
Expand Down
Loading