Skip to content

Commit

Permalink
refactor: use commit signals and pending values
Browse files Browse the repository at this point in the history
  • Loading branch information
limwa committed Jul 5, 2024
1 parent 0a68e52 commit 64afbeb
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 43 deletions.
4 changes: 2 additions & 2 deletions services/pulumi/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { apps } from "./services/databases/mongodb";
import "./services/ementas";
import { CommitSignal } from "./utils/pending";

apps.commit();
CommitSignal.globalParent.resolve();
67 changes: 26 additions & 41 deletions services/pulumi/resources/mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as pulumi from "@pulumi/pulumi";
import * as k8s from "@pulumi/kubernetes";
import * as crds from "#crds";
import { replicateTo } from "../utils/replicator";
import { CommitSignal, PendingValue } from "../utils/pending";

type Role =
// Database user roles
Expand Down Expand Up @@ -56,38 +57,36 @@ type Args<Databases extends string> = {

export class MongoDBCommunityController<
const Databases extends string
> extends pulumi.ComponentResource<{}> {
private name: string;
private namespace: pulumi.Input<string> | undefined;
private commitAction: PromiseWithResolvers<void>;
private committed = false;
> extends pulumi.ComponentResource<void> {
private name;
private namespace;

private users: pulumi.Input<crds.types.input.mongodbcommunity.v1.MongoDBCommunitySpecUsersArgs>[] =
[];

private operatorDependsOn: pulumi.Input<pulumi.Resource | undefined>[] = [];
public readonly commitSignal;
private users;
private operatorDependencies;

constructor(
name: string,
args: Args<Databases>,
opts?: pulumi.ComponentResourceOptions
) {
const commitAction = Promise.withResolvers<void>();
const commitSignal = new CommitSignal({ rejectIfNotCommitted: true });
super(
"niployments:mongodb:MongoDBCommunityController",
name,
{ commit: commitAction.promise },
{ commitSignal },
opts
);

this.name = name;
this.namespace = args?.namespace;
this.commitAction = commitAction;

const definedOperatorDependsOn = pulumi.output(this.commitAction.promise.then(() => {
const definedDeps = pulumi.output(this.operatorDependsOn).apply((deps) => deps.filter(pulumi.Resource.isInstance));
return definedDeps;
}).catch(() => pulumi.output([])));
this.commitSignal = commitSignal;
this.users = new PendingValue<pulumi.Input<crds.types.input.mongodbcommunity.v1.MongoDBCommunitySpecUsersArgs>[]>([], { commitSignal });
this.operatorDependencies = new PendingValue<pulumi.Input<pulumi.Resource | undefined>[]>([], { commitSignal });

const dependsOn = this.operatorDependencies.asOutput()
.apply((deps) => deps.filter(pulumi.Resource.isInstance));

const operatorName = `${this.name}-operator`;
new crds.mongodbcommunity.v1.MongoDBCommunity(
Expand All @@ -99,34 +98,16 @@ export class MongoDBCommunityController<
},
spec: args.mdbc?.spec && {
...args.mdbc?.spec,
users: pulumi.output(
this.commitAction.promise.then(() => this.users)
),
users: this.users.asOutput(),
},
},
{ parent: this, dependsOn: definedOperatorDependsOn }
{ parent: this, dependsOn }
);
}

protected async initialize({
commit,
}: {
commit: Promise<void>;
}): Promise<{}> {
await commit;
return {};
}

public commit() {
if (this.committed) {
pulumi.log.warn(
"MongoDBCommunityController has already been committed. This may cause some users to not be properly added to the replica set.",
this
);
}

this.committed = true;
this.commitAction.resolve();
protected initialize({ commitSignal }: { commitSignal: CommitSignal }): Promise<void> {
commitSignal.resource = this;
return commitSignal.waitForCommit();
}

public addUser(user: User<Databases>) {
Expand Down Expand Up @@ -167,8 +148,12 @@ export class MongoDBCommunityController<
} satisfies crds.types.input.mongodbcommunity.v1.MongoDBCommunitySpecUsersArgs)
);

this.users.push(userSpec);
this.users.run(users => users.push(userSpec));

return this;
}

public addOperatorDependency(dependency: pulumi.Input<pulumi.Resource | undefined>) {
return this.operatorDependencies.run(dependencies => dependencies.push(dependency));
}
}
110 changes: 110 additions & 0 deletions services/pulumi/utils/pending.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import * as pulumi from "@pulumi/pulumi";

type CommitSignalOptions = {
resource?: pulumi.Resource;
rejectIfNotCommitted?: boolean;
}

const warnMessage = "This may affect the deployment of certain resources.";

export class CommitSignal {
public static globalParent = new CommitSignal();

private committed = false;
private action = Promise.withResolvers<void>();

public resource;

constructor(opts?: CommitSignalOptions) {
this.resource = opts?.resource;

this.action.promise.finally(() => {
if (this.committed)
return;

pulumi.log.error(`Commit was not committed. ${warnMessage}`, this.resource);
});

if (this !== CommitSignal.globalParent)
this.attachTo(CommitSignal.globalParent, opts?.rejectIfNotCommitted);
}

public waitForCommit() {
return this.action.promise;
}

public resolve() {
if (this.committed) {
pulumi.log.error(`Commit resolved after being committed. ${warnMessage}`, this.resource);
return;
}

this.committed = true;
this.action.resolve();
}

public reject(error: Error) {
if (this.committed) {
pulumi.log.error(`Commit rejected after being committed. ${warnMessage}`, this.resource);
return;
}

this.committed = true;
this.action.reject(error);
}

public attachTo(parent: CommitSignal, rejectIfNotCommitted?: boolean) {
if (parent.committed) {
pulumi.log.warn(`Commit attached when parent was already committed. ${warnMessage}`, this.resource);
return;
}

rejectIfNotCommitted ??= false;

parent.action.promise
.then(() => {
if (this.committed)
return;

if (rejectIfNotCommitted) {
this.reject(new Error("Commit was not committed before parent."));
} else {
this.resolve();
}
})
.catch(err => {
if (this.committed)
return;

this.reject(err);
});
}
}

type PendingValueOptions = { commitSignal?: CommitSignal }

export class PendingValue<T> {

public readonly commit;

constructor(private value: T, opts?: PendingValueOptions) {
this.commit = opts?.commitSignal ?? new CommitSignal();
}

public run(func: (value: T) => void) {
func(this.get());
}

public get() {
return this.value;
}

public asOutput() {
return pulumi.output(this.waitForValue());
}

public async waitForValue() {
await this.commit.waitForCommit();
return this.value;
}
}
1 change: 1 addition & 0 deletions services/pulumi/utils/pulumi.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as pulumi from "@pulumi/pulumi";
import { CommitSignal } from "./pending";

type RecursivePulumiInput<T> =
T extends object
Expand Down

0 comments on commit 64afbeb

Please sign in to comment.