Skip to content

Commit

Permalink
Added metrics collectors for brain
Browse files Browse the repository at this point in the history
Added pusher metrics collectors for extra
  • Loading branch information
rennokki committed Jul 11, 2023
1 parent 898f62c commit b3c95fe
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 0 deletions.
33 changes: 33 additions & 0 deletions src/metrics/brain-metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import type * as FN from '@soketi/impl/types';
import { Brain } from '../brain';
import { Metrics } from './metrics';
import { Connections } from '../ws';

export class BrainMetrics extends Metrics {
metrics: FN.JSON.Object = {};

constructor(
readonly brain: Brain,
readonly connections: Connections,
) {
super(connections);
}

async snapshot(namespace: string): Promise<void> {
this.snapshotInProgress = true;

await this.brain.set(`metrics:${namespace}`, {
connections: this.connections.connections.size,
});

this.snapshotInProgress = false;
}

async get(namespace: string): Promise<FN.JSON.Object> {
return (await this.brain.get(`metrics:${namespace}`)) || {};
}

async cleanup(): Promise<void> {
//
}
}
2 changes: 2 additions & 0 deletions src/metrics/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './brain-metrics';
export * from './metrics';
19 changes: 19 additions & 0 deletions src/metrics/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type * as FN from '@soketi/impl/types';
import { Connections } from '../ws';

export abstract class Metrics {
snapshotInProgress = false;

constructor(
readonly connections: Connections,
) {
//
}

abstract snapshot(namespace: string): Promise<void>;
abstract get(namespace: string): Promise<FN.JSON.Object>;

async cleanup(): Promise<void> {
//
}
}
1 change: 1 addition & 0 deletions src/pusher/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './apps';
export * from './channels';
export * from './metrics';
export * from './queue';
export * from './ws';
export * from './utils';
31 changes: 31 additions & 0 deletions src/pusher/metrics/brain-metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Brain } from '../../brain';
import { BrainMetrics } from '../../metrics';
import { PusherConnections } from '../ws';

export class PusherBrainMetrics extends BrainMetrics {
constructor(
readonly brain: Brain,
readonly connections: PusherConnections,
) {
super(brain, connections);
}

async snapshot(namespace: string): Promise<void> {
this.snapshotInProgress = true;

await this.brain.set(`metrics:${namespace}`, {
connections: this.connections.connections.size,
channels: [...this.connections.channels].map(([channel, connections]) => ({
channel,
connections: connections.size,
})),
users: [...this.connections.users].map(([user, connections]) => ({
user,
connections,
})),
started: this.connections.started.toISOString(),
});

this.snapshotInProgress = false;
}
}
1 change: 1 addition & 0 deletions src/pusher/metrics/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './brain-metrics';
69 changes: 69 additions & 0 deletions tests/pusher/ws.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { PusherConnection, PusherConnections } from '../../src/pusher/ws';
import { describe, test, expect, beforeEach } from 'vitest';
import { createHmac } from 'crypto';
import { Brain, LocalBrain } from '../../src/brain';
import { PusherBrainMetrics } from '../../src/pusher';

const pusherUtil = require('pusher/lib/util');
const Pusher = require('pusher');
Expand Down Expand Up @@ -192,6 +193,11 @@ describe('pusher/ws', () => {
const conn = new PusherConnection('test', {
send: async (message) => {
if (message.indexOf('pusher:signin_success') !== -1) {
expect(message).toBe(JSON.stringify({
event: 'pusher:signin_success',
data: await messageData(conn.id),
}));

await conns.terminateUserConnections(userData.id);
}
},
Expand All @@ -208,6 +214,69 @@ describe('pusher/ws', () => {
data: await messageData(conn.id),
});
}));

test('join and leave triggers metrics change', async () => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper, brain);
const metrics = new PusherBrainMetrics(brain, conns);

const conn = new PusherConnection('test', {
send: (message) => {
//
},
close: (code, reason) => {
//
},
});

await conns.newConnection(conn);

await conns.subscribeToChannel(conn, {
event: 'pusher:subscribe',
data: {
channel: 'test',
},
});

expect([...conn.subscribedChannels]).toEqual(['test']);
expect(conn.presence).toEqual(new Map());
expect(conns.channels.get('test')).lengthOf(1);

await metrics.snapshot(app.id);
expect(await metrics.get(app.id)).toEqual({
connections: 1,
channels: [{
channel: 'test',
connections: 1,
}],
users: [],
started: conns.started.toISOString(),
});

await conns.unsubscribeFromChannel(conn, 'test');

expect([...conn.subscribedChannels]).toEqual([]);
expect(conn.presence).toEqual(new Map());
expect(conns.channels.get('test')).toBeUndefined();

await metrics.snapshot(app.id);
expect(await metrics.get(app.id)).toEqual({
connections: 1,
channels: [],
users: [],
started: conns.started.toISOString(),
});

await conns.removeConnection(conn);

await metrics.snapshot(app.id);
expect(await metrics.get(app.id)).toEqual({
connections: 0,
channels: [],
users: [],
started: conns.started.toISOString(),
});
});
});

class LocalConnections extends PusherConnections {
Expand Down
11 changes: 11 additions & 0 deletions tests/ws/router.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Connection, Connections, Router as WsRouter } from '../../src/ws';
import { LocalBrain } from '../../src/brain';
import { BrainMetrics } from '../../src/metrics';
import { describe, test, expect } from 'vitest';

describe('ws/router', () => {
Expand All @@ -11,9 +13,18 @@ describe('ws/router', () => {

const connections = new LocalConnections();
const conn = new Connection('test', { });
const brain = new LocalBrain();
const metrics = new BrainMetrics(brain, connections);

await connections.newConnection(conn);
WsRouter.handleNewConnection(conn);

await metrics.snapshot('test');
expect(await metrics.get('test')).toEqual({
connections: 1,
});

expect(await metrics.get('test2')).toEqual({});
}));

test('onConnectionClosed', () => new Promise<void>(async (done) => {
Expand Down

0 comments on commit b3c95fe

Please sign in to comment.