diff --git a/src/metrics/brain-metrics.ts b/src/metrics/brain-metrics.ts new file mode 100644 index 0000000..d329d20 --- /dev/null +++ b/src/metrics/brain-metrics.ts @@ -0,0 +1,33 @@ +import type * as FN from '@soketi/impl/types'; +import { Brain } from '../brain'; +import { Metrics } from './metrics'; +import { Connections } from '../ws'; + +export class BrainMetrics extends Metrics { + metrics: FN.JSON.Object = {}; + + constructor( + readonly brain: Brain, + readonly connections: Connections, + ) { + super(connections); + } + + async snapshot(namespace: string): Promise { + this.snapshotInProgress = true; + + await this.brain.set(`metrics:${namespace}`, { + connections: this.connections.connections.size, + }); + + this.snapshotInProgress = false; + } + + async get(namespace: string): Promise { + return (await this.brain.get(`metrics:${namespace}`)) || {}; + } + + async cleanup(): Promise { + // + } +} diff --git a/src/metrics/index.ts b/src/metrics/index.ts new file mode 100644 index 0000000..7041181 --- /dev/null +++ b/src/metrics/index.ts @@ -0,0 +1,2 @@ +export * from './brain-metrics'; +export * from './metrics'; diff --git a/src/metrics/metrics.ts b/src/metrics/metrics.ts new file mode 100644 index 0000000..e856b88 --- /dev/null +++ b/src/metrics/metrics.ts @@ -0,0 +1,19 @@ +import type * as FN from '@soketi/impl/types'; +import { Connections } from '../ws'; + +export abstract class Metrics { + snapshotInProgress = false; + + constructor( + readonly connections: Connections, + ) { + // + } + + abstract snapshot(namespace: string): Promise; + abstract get(namespace: string): Promise; + + async cleanup(): Promise { + // + } +} diff --git a/src/pusher/index.ts b/src/pusher/index.ts index 2d85c9f..48bbe66 100644 --- a/src/pusher/index.ts +++ b/src/pusher/index.ts @@ -1,5 +1,6 @@ export * from './apps'; export * from './channels'; +export * from './metrics'; export * from './queue'; export * from './ws'; export * from './utils'; diff --git a/src/pusher/metrics/brain-metrics.ts b/src/pusher/metrics/brain-metrics.ts new file mode 100644 index 0000000..7dd0742 --- /dev/null +++ b/src/pusher/metrics/brain-metrics.ts @@ -0,0 +1,31 @@ +import { Brain } from '../../brain'; +import { BrainMetrics } from '../../metrics'; +import { PusherConnections } from '../ws'; + +export class PusherBrainMetrics extends BrainMetrics { + constructor( + readonly brain: Brain, + readonly connections: PusherConnections, + ) { + super(brain, connections); + } + + async snapshot(namespace: string): Promise { + this.snapshotInProgress = true; + + await this.brain.set(`metrics:${namespace}`, { + connections: this.connections.connections.size, + channels: [...this.connections.channels].map(([channel, connections]) => ({ + channel, + connections: connections.size, + })), + users: [...this.connections.users].map(([user, connections]) => ({ + user, + connections, + })), + started: this.connections.started.toISOString(), + }); + + this.snapshotInProgress = false; + } +} diff --git a/src/pusher/metrics/index.ts b/src/pusher/metrics/index.ts new file mode 100644 index 0000000..1a109da --- /dev/null +++ b/src/pusher/metrics/index.ts @@ -0,0 +1 @@ +export * from './brain-metrics'; diff --git a/tests/pusher/ws.test.ts b/tests/pusher/ws.test.ts index 690bd4a..d9dd144 100644 --- a/tests/pusher/ws.test.ts +++ b/tests/pusher/ws.test.ts @@ -6,6 +6,7 @@ import { PusherConnection, PusherConnections } from '../../src/pusher/ws'; import { describe, test, expect, beforeEach } from 'vitest'; import { createHmac } from 'crypto'; import { Brain, LocalBrain } from '../../src/brain'; +import { PusherBrainMetrics } from '../../src/pusher'; const pusherUtil = require('pusher/lib/util'); const Pusher = require('pusher'); @@ -192,6 +193,11 @@ describe('pusher/ws', () => { const conn = new PusherConnection('test', { send: async (message) => { if (message.indexOf('pusher:signin_success') !== -1) { + expect(message).toBe(JSON.stringify({ + event: 'pusher:signin_success', + data: await messageData(conn.id), + })); + await conns.terminateUserConnections(userData.id); } }, @@ -208,6 +214,69 @@ describe('pusher/ws', () => { data: await messageData(conn.id), }); })); + + test('join and leave triggers metrics change', async () => { + const app = await AppsRegistry.getById('app-id') as TestApp; + const conns = new LocalConnections(app, gossiper, brain); + const metrics = new PusherBrainMetrics(brain, conns); + + const conn = new PusherConnection('test', { + send: (message) => { + // + }, + close: (code, reason) => { + // + }, + }); + + await conns.newConnection(conn); + + await conns.subscribeToChannel(conn, { + event: 'pusher:subscribe', + data: { + channel: 'test', + }, + }); + + expect([...conn.subscribedChannels]).toEqual(['test']); + expect(conn.presence).toEqual(new Map()); + expect(conns.channels.get('test')).lengthOf(1); + + await metrics.snapshot(app.id); + expect(await metrics.get(app.id)).toEqual({ + connections: 1, + channels: [{ + channel: 'test', + connections: 1, + }], + users: [], + started: conns.started.toISOString(), + }); + + await conns.unsubscribeFromChannel(conn, 'test'); + + expect([...conn.subscribedChannels]).toEqual([]); + expect(conn.presence).toEqual(new Map()); + expect(conns.channels.get('test')).toBeUndefined(); + + await metrics.snapshot(app.id); + expect(await metrics.get(app.id)).toEqual({ + connections: 1, + channels: [], + users: [], + started: conns.started.toISOString(), + }); + + await conns.removeConnection(conn); + + await metrics.snapshot(app.id); + expect(await metrics.get(app.id)).toEqual({ + connections: 0, + channels: [], + users: [], + started: conns.started.toISOString(), + }); + }); }); class LocalConnections extends PusherConnections { diff --git a/tests/ws/router.test.ts b/tests/ws/router.test.ts index 79bbc7e..a213aa3 100644 --- a/tests/ws/router.test.ts +++ b/tests/ws/router.test.ts @@ -1,4 +1,6 @@ import { Connection, Connections, Router as WsRouter } from '../../src/ws'; +import { LocalBrain } from '../../src/brain'; +import { BrainMetrics } from '../../src/metrics'; import { describe, test, expect } from 'vitest'; describe('ws/router', () => { @@ -11,9 +13,18 @@ describe('ws/router', () => { const connections = new LocalConnections(); const conn = new Connection('test', { }); + const brain = new LocalBrain(); + const metrics = new BrainMetrics(brain, connections); await connections.newConnection(conn); WsRouter.handleNewConnection(conn); + + await metrics.snapshot('test'); + expect(await metrics.get('test')).toEqual({ + connections: 1, + }); + + expect(await metrics.get('test2')).toEqual({}); })); test('onConnectionClosed', () => new Promise(async (done) => {