Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aligning Workers Integration with EFS and DB #263

Merged
merged 1 commit into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { FileSystem } from './types';
import type { PolykeyWorkerManagerInterface } from './workers/types';
import type { Host, Port } from './network/types';

import path from 'path';
Expand All @@ -17,7 +18,6 @@ import { Sigchain } from './sigchain';
import { ACL } from './acl';
import { DB } from '@matrixai/db';
import { Discovery } from './discovery';
import { WorkerManager } from './workers';
import { SessionManager } from './sessions';
import { certNodeId } from './network/utils';
import { IdentitiesManager } from './identities';
Expand All @@ -30,6 +30,7 @@ import { GithubProvider } from './identities/providers';
import config from './config';
import { ErrorStateVersionMismatch } from './errors';
import { CreateDestroyStartStop } from '@matrixai/async-init/dist/CreateDestroyStartStop';
import { createWorkerManager } from './workers/utils';

interface Polykey extends CreateDestroyStartStop {}
@CreateDestroyStartStop(
Expand All @@ -46,7 +47,7 @@ class Polykey {
public readonly gestalts: GestaltGraph;
public readonly identities: IdentitiesManager;
public readonly notifications: NotificationsManager;
public readonly workers?: WorkerManager;
public readonly workers?: PolykeyWorkerManagerInterface;
public readonly sigchain: Sigchain;
public readonly acl: ACL;
public readonly db: DB;
Expand Down Expand Up @@ -111,7 +112,7 @@ class Polykey {
notificationsManager?: NotificationsManager;
acl?: ACL;
db?: DB;
workerManager?: WorkerManager | null;
workerManager?: PolykeyWorkerManagerInterface | null;
clientGrpcHost?: string;
agentGrpcHost?: string;
clientGrpcPort?: number;
Expand Down Expand Up @@ -193,12 +194,12 @@ class Polykey {
// Writing current version info.
await fs_.promises.writeFile(versionFilePath, JSON.stringify(config));

let workers_: WorkerManager | undefined = undefined;
let workers_: PolykeyWorkerManagerInterface | undefined = undefined;
if (workerManager !== null) {
logger_.info('Creating a WorkerManager');
workers_ =
workerManager ??
(await WorkerManager.createPolykeyWorkerManager({
(await createWorkerManager({
cores,
logger: logger_.getChild('WorkerManager'),
}));
Expand Down Expand Up @@ -298,7 +299,11 @@ class Polykey {
fs: fs_,
logger: logger_.getChild('VaultManager'),
}));
// Vaults_.setWorkerManager(workers_); FIXME, need to be able to set this.
// Setting the workerManager for vaults.
if (workers_ != null) {
logger_.info('Setting workerManager for vaults');
vaults_.setWorkerManager(workers_);
}
const identities_ =
identitiesManager ??
(await IdentitiesManager.createIdentitiesManager({
Expand Down Expand Up @@ -400,7 +405,7 @@ class Polykey {
notificationsManager: NotificationsManager;
acl: ACL;
db: DB;
workerManager?: WorkerManager;
workerManager?: PolykeyWorkerManagerInterface;
clientGrpcHost: string;
agentGrpcHost: string;
clientGrpcPort: number;
Expand Down Expand Up @@ -583,12 +588,14 @@ class Polykey {
await this.revProxy.stop();
await this.fwdProxy.stop();
await this.db.stop();
this.keys.unsetWorkerManager();
this.logger.info('Stopped Polykey');
}

public async destroy() {
this.logger.info('Destroying Polykey');
this.keys.unsetWorkerManager();
this.db.unsetWorkerManager();
this.vaults.unsetWorkerManager();
await this.agentGrpcServer.destroy();
await this.clientGrpcServer.destroy();
await this.sessions.destroy();
Expand Down
1 change: 0 additions & 1 deletion src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,5 @@ export * from './network/errors';
export * from './nodes/errors';
export * from './claims/errors';
export * from './sigchain/errors';
export * from './workers/errors';
export * from './bootstrap/errors';
export * from './notifications/errors';
6 changes: 3 additions & 3 deletions src/keys/KeyManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import type {
CertificatePemChain,
} from './types';
import type { FileSystem } from '../types';
import type { WorkerManager } from '../workers';
import type { NodeId } from '../nodes/types';
import type { PolykeyWorkerManagerInterface } from '../workers/types';

import path from 'path';
import Logger from '@matrixai/logger';
Expand Down Expand Up @@ -38,7 +38,7 @@ class KeyManager {
protected _dbKey: Buffer;
protected _vaultKey: Buffer;
protected rootCert: Certificate;
protected workerManager?: WorkerManager;
protected workerManager?: PolykeyWorkerManagerInterface;

static async createKeyManager({
keysPath,
Expand Down Expand Up @@ -93,7 +93,7 @@ class KeyManager {
this.vaultKeyPath = path.join(keysPath, 'vault.key');
}

public setWorkerManager(workerManager: WorkerManager) {
public setWorkerManager(workerManager: PolykeyWorkerManagerInterface) {
this.workerManager = workerManager;
}

Expand Down
2 changes: 0 additions & 2 deletions src/notifications/NotificationsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import type { DB, DBLevel } from '@matrixai/db';
import type { KeyManager } from '../keys';
import type { NodeManager } from '../nodes';
import type { NodeId } from '../nodes/types';
import type { WorkerManager } from '../workers';

import Logger from '@matrixai/logger';
import { Mutex } from 'async-mutex';
Expand All @@ -34,7 +33,6 @@ class NotificationsManager {
protected db: DB;
protected keyManager: KeyManager;
protected nodeManager: NodeManager;
protected workerManager?: WorkerManager;

protected messageCap: number;

Expand Down
30 changes: 15 additions & 15 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,25 @@ type POJO = { [key: string]: any };
*/
type Opaque<K, T> = T & { __TYPE__: K };

/**
* Any type that can be turned into a string
*/
interface ToString {
toString(): string;
}

/**
* Allows extension of constructors that use POJOs
*/
type AbstractConstructorParameters<T> = ConstructorParameters<
(new (...args: any) => any) & T
>;

type Initial<T extends any[]> = T extends [...infer Head, any] ? Head : any[];
type InitialParameters<T extends (...args: any) => any> = Initial<
Parameters<T>
>;

/**
* Any type that can be turned into a string
*/
interface ToString {
toString(): string;
}

/**
* Wrap a type to be reference counted
* Useful for when we need to garbage collect data
Expand Down Expand Up @@ -69,20 +74,15 @@ type LockConfig = {
clientPort?: number | undefined;
} & POJO;

type Initial<T extends any[]> = T extends [...infer Head, any] ? Head : any[];
type InitialParameters<T extends (...args: any) => any> = Initial<
Parameters<T>
>;

export {
POJO,
Opaque,
ToString,
AbstractConstructorParameters,
Initial,
InitialParameters,
ToString,
Ref,
Timer,
FileSystem,
LockConfig,
Initial,
InitialParameters,
};
11 changes: 9 additions & 2 deletions src/vaults/VaultManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import type {
Vault,
} from './types';
import type { FileSystem } from '../types';
import type { WorkerManager } from '../workers';
import type { NodeId } from '../nodes/types';
import type { PolykeyWorkerManagerInterface } from '../workers/types';

import path from 'path';
import Logger from '@matrixai/logger';
Expand Down Expand Up @@ -50,7 +50,6 @@ class VaultManager {
protected efs: EncryptedFS;
protected db: DB;
protected logger: Logger;
protected workerManager?: WorkerManager;
protected vaultsKey: VaultKey;
protected vaultsMap: VaultMap;
protected vaultsDbDomain: string;
Expand Down Expand Up @@ -167,6 +166,14 @@ class VaultManager {
this.logger = logger ?? new Logger(this.constructor.name);
}

public setWorkerManager(workerManager: PolykeyWorkerManagerInterface) {
this.efs.setWorkerManager(workerManager);
}

public unsetWorkerManager() {
this.efs.unsetWorkerManager();
}

public async transaction<T>(
f: (vaultManager: VaultManager) => Promise<T>,
lock: MutexInterface,
Expand Down
25 changes: 0 additions & 25 deletions src/workers/WorkerManager.ts

This file was deleted.

7 changes: 0 additions & 7 deletions src/workers/errors.ts

This file was deleted.

6 changes: 4 additions & 2 deletions src/workers/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export { default as WorkerManager } from './WorkerManager';
export { default as polykeyWorker } from './polykeyWorkerModule';
export * as errors from './errors';
export * as utils from './utils';

export type { PolykeyWorkerModule } from './polykeyWorkerModule';
export type { PolykeyWorkerManagerInterface } from './types';
4 changes: 2 additions & 2 deletions src/workers/polykeyWorker.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { PolykeyWorker } from './polykeyWorkerModule';
import type { PolykeyWorkerModule } from './polykeyWorkerModule';
import { expose } from 'threads/worker';

import polykeyWorker from './polykeyWorkerModule';

expose(polykeyWorker);

export type { PolykeyWorker };
export type { PolykeyWorkerModule };
85 changes: 3 additions & 82 deletions src/workers/polykeyWorkerModule.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { TransferDescriptor } from 'threads';
import type { PublicKeyAsn1, PrivateKeyAsn1, KeyPairAsn1 } from '../keys/types';

import { Transfer } from 'threads/worker';
import { utils as keysUtils } from '../keys';
import { isWorkerRuntime, Transfer } from 'threads/worker';

/**
* Worker object that contains all functions that will be executed in parallel
Expand Down Expand Up @@ -32,24 +32,6 @@ const polykeyWorker = {
}
},

// Normal functions
/**
* Check if we are running in the worker.
* Only used for testing
*/
isRunningInWorker(): boolean {
return isWorkerRuntime();
},
/**
* Sleep synchronously
* This blocks the entire event loop
* Only used for testing
*/
sleep(ms: number): void {
Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, ms);
return;
},

// KeyManager operations
/**
* Generate KeyPair
Expand Down Expand Up @@ -93,71 +75,10 @@ const polykeyWorker = {
const signed = keysUtils.verifyWithPublicKey(publicKey, data_, signature_);
return signed;
},

// These operations are "overwriting" the EFS ops
// they are using our crypto mechanisms
// during parallel execution
// unless we are injecting our own crypto mechanism in it
// so they are sharing the same system
// so it seems the proper way to do this
// would be ensure that we have a common interface for a crypto utility
// like an object containing the relevant operations

// DB operations
/**
* Zero copy encryption of plain text to cipher text
*/
async encryptWithKey(
key: ArrayBuffer,
keyOffset: number,
keyLength: number,
plainText: ArrayBuffer,
plainTextOffset: number,
plainTextLength: number,
): Promise<TransferDescriptor<[ArrayBuffer, number, number]>> {
const key_ = Buffer.from(key, keyOffset, keyLength);
const plainText_ = Buffer.from(plainText, plainTextOffset, plainTextLength);
const cipherText = Buffer.from(
await keysUtils.encryptWithKey(key_, plainText_),
);
return Transfer(
[cipherText.buffer, cipherText.byteOffset, cipherText.byteLength],
[cipherText.buffer],
);
},
/**
* Zero copy decryption of cipher text to plain text
*/
async decryptWithKey(
key: ArrayBuffer,
keyOffset: number,
keyLength: number,
cipherText: ArrayBuffer,
cipherTextOffset: number,
cipherTextLength: number,
): Promise<TransferDescriptor<[ArrayBuffer, number, number]> | undefined> {
const key_ = Buffer.from(key, keyOffset, keyLength);
const cipherText_ = Buffer.from(
cipherText,
cipherTextOffset,
cipherTextLength,
);
const plainText = Buffer.from(
(await keysUtils.decryptWithKey(key_, cipherText_))!,
);
if (plainText != null) {
return Transfer(
[plainText.buffer, plainText.byteOffset, plainText.byteLength],
[plainText.buffer],
);
} else {
return;
}
},
};

type PolykeyWorker = typeof polykeyWorker;
type PolykeyWorkerModule = typeof polykeyWorker;

export type { PolykeyWorker };
export type { PolykeyWorkerModule };

export default polykeyWorker;
Loading