From c8dae335e82e1a372b8b51fa130698849921894e Mon Sep 17 00:00:00 2001 From: teidesu Date: Sun, 20 Jun 2021 18:18:06 +0300 Subject: [PATCH] feat: vacuum storage to reduce its size --- packages/core/src/storage/memory.ts | 61 +++++++++++++++++++++++--- packages/sqlite/index.ts | 68 +++++++++++++++++++---------- 2 files changed, 100 insertions(+), 29 deletions(-) diff --git a/packages/core/src/storage/memory.ts b/packages/core/src/storage/memory.ts index 1fd2c037..ca191cef 100644 --- a/packages/core/src/storage/memory.ts +++ b/packages/core/src/storage/memory.ts @@ -53,10 +53,13 @@ const USERNAME_TTL = 86400000 // 24 hours export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ { protected _state: MemorySessionState - private _cachedInputPeers: Record = {} + private _cachedInputPeers: LruMap = new LruMap(100) private _cachedFull: LruMap + private _vacuumTimeout: NodeJS.Timeout + private _vacuumInterval: number + constructor(params?: { /** * Maximum number of cached full entities. @@ -69,9 +72,31 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ { * Defaults to `100`, use `0` to disable */ cacheSize?: number + + /** + * Interval in milliseconds for vacuuming the storage. + * + * When vacuuming, the storage will remove expired FSM + * states to reduce memory usage. + * + * Defaults to `300_000` (5 minutes) + */ + vacuumInterval?: number }) { this.reset() this._cachedFull = new LruMap(params?.cacheSize ?? 100) + this._vacuumInterval = params?.vacuumInterval ?? 300_000 + } + + load(): void { + this._vacuumTimeout = setInterval( + this._vacuum.bind(this), + this._vacuumInterval + ) + } + + destroy(): void { + clearInterval(this._vacuumTimeout) } reset(): void { @@ -122,6 +147,30 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ { this._state = obj } + private _vacuum(): void { + // remove expired entities from fsm and rate limit storages + + const now = Date.now() + + // make references in advance to avoid lookups + const state = this._state + const fsm = state.fsm + const rl = state.rl + + Object.keys(fsm).forEach((key) => { + const exp = fsm[key].e + if (exp && exp < now) { + delete fsm[key] + } + }) + + Object.keys(rl).forEach((key) => { + if (rl[key].res < now) { + delete rl[key] + } + }) + } + getDefaultDc(): tl.RawDcOption | null { return this._state.defaultDc } @@ -192,10 +241,10 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ { } getPeerById(peerId: number): tl.TypeInputPeer | null { - if (peerId in this._cachedInputPeers) - return this._cachedInputPeers[peerId] + if (this._cachedInputPeers.has(peerId)) + return this._cachedInputPeers.get(peerId)! const peer = this._getInputPeer(this._state.entities[peerId]) - if (peer) this._cachedInputPeers[peerId] = peer + if (peer) this._cachedInputPeers.set(peerId, peer) return peer } @@ -302,7 +351,7 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ { if (!(key in this._state.rl)) { const state = { res: now + window * 1000, - rem: limit + rem: limit, } this._state.rl[key] = state @@ -315,7 +364,7 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ { const state = { res: now + window * 1000, - rem: limit + rem: limit, } this._state.rl[key] = state diff --git a/packages/sqlite/index.ts b/packages/sqlite/index.ts index 0cededc6..7db546a9 100644 --- a/packages/sqlite/index.ts +++ b/packages/sqlite/index.ts @@ -124,13 +124,6 @@ interface FsmItem { expires?: number } -interface RateLimitItem { - // reset - res: number - // remaining - rem: number -} - const STATEMENTS = { getKv: 'select value from kv where key = ?', setKv: 'insert or replace into kv (key, value) values (?, ?)', @@ -156,6 +149,8 @@ const STATEMENTS = { getEntById: 'select * from entities where id = ?', getEntByPhone: 'select * from entities where phone = ?', getEntByUser: 'select * from entities where username = ?', + + delStaleState: 'delete from state where expires < ?' } as const const EMPTY_BUFFER = Buffer.alloc(0) @@ -175,7 +170,7 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { private _cache?: LruMap private _fsmCache?: LruMap - private _rlCache?: LruMap + private _rlCache?: LruMap private _wal?: boolean @@ -183,6 +178,9 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { private _saveUnimportantLater: () => void + private _vacuumTimeout: NodeJS.Timeout + private _vacuumInterval: number + /** * @param filename Database file name, or `:memory:` for in-memory DB * @param params @@ -253,6 +251,16 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { * Defaults to `30000` (30 sec) */ unimportantSavesDelay?: number + + /** + * Interval in milliseconds for vacuuming the storage. + * + * When vacuuming, the storage will remove expired FSM + * states to reduce disk and memory usage. + * + * Defaults to `300_000` (5 minutes) + */ + vacuumInterval?: number } ) { this._filename = filename @@ -289,6 +297,9 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { this._pendingUnimportant = {} }, params?.unimportantSavesDelay ?? 30000) + + this._vacuumInterval = params?.vacuumInterval ?? 300_000 + // todo: add support for workers (idk if really needed, but still) } @@ -372,6 +383,11 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { } } + private _vacuum(): void { + this._statements.delStaleState.run(Date.now()) + // local caches aren't cleared because it would be too expensive + } + load(): void { this._db = sqlite3(this._filename, { verbose: debug.enabled ? debug : null, @@ -396,6 +412,11 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { this._statements.updateCachedEnt.run(it) }) }) + + this._vacuumTimeout = setInterval( + this._vacuum.bind(this), + this._vacuumInterval + ) } save(): void { @@ -409,6 +430,7 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { destroy(): void { this._db.close() + clearInterval(this._vacuumTimeout) } reset(): void { @@ -653,39 +675,39 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { // leaky bucket const now = Date.now() - let val: RateLimitItem | undefined = this._rlCache?.get(key) + let val: FsmItem | undefined = this._rlCache?.get(key) const cached = val if (!val) { const got = this._statements.getState.get(`$rate_limit_${key}`) if (got) { - val = JSON.parse(got.value) + val = got } } - if (!val || val.res < now) { + if (!val || val.expires! < now) { // expired or does not exist - const state = { - res: now + window * 1000, - rem: limit + const item: FsmItem = { + expires: now + window * 1000, + value: limit } this._statements.setState.run( `$rate_limit_${key}`, - JSON.stringify(state), - null + item.value, + item.expires ) - this._rlCache?.set(key, state) + this._rlCache?.set(key, item) - return [state.rem, state.res] + return [item.value, item.expires!] } - if (val.rem > 0) { - val.rem -= 1 + if (val.value > 0) { + val.value -= 1 this._statements.setState.run( `$rate_limit_${key}`, - JSON.stringify(val), - null + val.value, + val.expires ) if (!cached) { // add to cache @@ -694,7 +716,7 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { } } - return [val.rem, val.res] + return [val.value, val.expires!] } resetRateLimit(key: string): void {