feat: vacuum storage to reduce its size
This commit is contained in:
parent
b45cc0df69
commit
c8dae335e8
2 changed files with 100 additions and 29 deletions
|
@ -53,10 +53,13 @@ const USERNAME_TTL = 86400000 // 24 hours
|
|||
|
||||
export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ {
|
||||
protected _state: MemorySessionState
|
||||
private _cachedInputPeers: Record<number, tl.TypeInputPeer> = {}
|
||||
private _cachedInputPeers: LruMap<number, tl.TypeInputPeer> = new LruMap(100)
|
||||
|
||||
private _cachedFull: LruMap<number, tl.TypeUser | tl.TypeChat>
|
||||
|
||||
private _vacuumTimeout: NodeJS.Timeout
|
||||
private _vacuumInterval: number
|
||||
|
||||
constructor(params?: {
|
||||
/**
|
||||
* Maximum number of cached full entities.
|
||||
|
@ -69,9 +72,31 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
* Defaults to `100`, use `0` to disable
|
||||
*/
|
||||
cacheSize?: number
|
||||
|
||||
/**
|
||||
* Interval in milliseconds for vacuuming the storage.
|
||||
*
|
||||
* When vacuuming, the storage will remove expired FSM
|
||||
* states to reduce memory usage.
|
||||
*
|
||||
* Defaults to `300_000` (5 minutes)
|
||||
*/
|
||||
vacuumInterval?: number
|
||||
}) {
|
||||
this.reset()
|
||||
this._cachedFull = new LruMap(params?.cacheSize ?? 100)
|
||||
this._vacuumInterval = params?.vacuumInterval ?? 300_000
|
||||
}
|
||||
|
||||
load(): void {
|
||||
this._vacuumTimeout = setInterval(
|
||||
this._vacuum.bind(this),
|
||||
this._vacuumInterval
|
||||
)
|
||||
}
|
||||
|
||||
destroy(): void {
|
||||
clearInterval(this._vacuumTimeout)
|
||||
}
|
||||
|
||||
reset(): void {
|
||||
|
@ -122,6 +147,30 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
this._state = obj
|
||||
}
|
||||
|
||||
private _vacuum(): void {
|
||||
// remove expired entities from fsm and rate limit storages
|
||||
|
||||
const now = Date.now()
|
||||
|
||||
// make references in advance to avoid lookups
|
||||
const state = this._state
|
||||
const fsm = state.fsm
|
||||
const rl = state.rl
|
||||
|
||||
Object.keys(fsm).forEach((key) => {
|
||||
const exp = fsm[key].e
|
||||
if (exp && exp < now) {
|
||||
delete fsm[key]
|
||||
}
|
||||
})
|
||||
|
||||
Object.keys(rl).forEach((key) => {
|
||||
if (rl[key].res < now) {
|
||||
delete rl[key]
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
getDefaultDc(): tl.RawDcOption | null {
|
||||
return this._state.defaultDc
|
||||
}
|
||||
|
@ -192,10 +241,10 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
}
|
||||
|
||||
getPeerById(peerId: number): tl.TypeInputPeer | null {
|
||||
if (peerId in this._cachedInputPeers)
|
||||
return this._cachedInputPeers[peerId]
|
||||
if (this._cachedInputPeers.has(peerId))
|
||||
return this._cachedInputPeers.get(peerId)!
|
||||
const peer = this._getInputPeer(this._state.entities[peerId])
|
||||
if (peer) this._cachedInputPeers[peerId] = peer
|
||||
if (peer) this._cachedInputPeers.set(peerId, peer)
|
||||
return peer
|
||||
}
|
||||
|
||||
|
@ -302,7 +351,7 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
if (!(key in this._state.rl)) {
|
||||
const state = {
|
||||
res: now + window * 1000,
|
||||
rem: limit
|
||||
rem: limit,
|
||||
}
|
||||
|
||||
this._state.rl[key] = state
|
||||
|
@ -315,7 +364,7 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
|
||||
const state = {
|
||||
res: now + window * 1000,
|
||||
rem: limit
|
||||
rem: limit,
|
||||
}
|
||||
|
||||
this._state.rl[key] = state
|
||||
|
|
|
@ -124,13 +124,6 @@ interface FsmItem {
|
|||
expires?: number
|
||||
}
|
||||
|
||||
interface RateLimitItem {
|
||||
// reset
|
||||
res: number
|
||||
// remaining
|
||||
rem: number
|
||||
}
|
||||
|
||||
const STATEMENTS = {
|
||||
getKv: 'select value from kv where key = ?',
|
||||
setKv: 'insert or replace into kv (key, value) values (?, ?)',
|
||||
|
@ -156,6 +149,8 @@ const STATEMENTS = {
|
|||
getEntById: 'select * from entities where id = ?',
|
||||
getEntByPhone: 'select * from entities where phone = ?',
|
||||
getEntByUser: 'select * from entities where username = ?',
|
||||
|
||||
delStaleState: 'delete from state where expires < ?'
|
||||
} as const
|
||||
|
||||
const EMPTY_BUFFER = Buffer.alloc(0)
|
||||
|
@ -175,7 +170,7 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
|
||||
private _cache?: LruMap<number, CacheItem>
|
||||
private _fsmCache?: LruMap<string, FsmItem>
|
||||
private _rlCache?: LruMap<string, RateLimitItem>
|
||||
private _rlCache?: LruMap<string, FsmItem>
|
||||
|
||||
private _wal?: boolean
|
||||
|
||||
|
@ -183,6 +178,9 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
|
||||
private _saveUnimportantLater: () => void
|
||||
|
||||
private _vacuumTimeout: NodeJS.Timeout
|
||||
private _vacuumInterval: number
|
||||
|
||||
/**
|
||||
* @param filename Database file name, or `:memory:` for in-memory DB
|
||||
* @param params
|
||||
|
@ -253,6 +251,16 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
* Defaults to `30000` (30 sec)
|
||||
*/
|
||||
unimportantSavesDelay?: number
|
||||
|
||||
/**
|
||||
* Interval in milliseconds for vacuuming the storage.
|
||||
*
|
||||
* When vacuuming, the storage will remove expired FSM
|
||||
* states to reduce disk and memory usage.
|
||||
*
|
||||
* Defaults to `300_000` (5 minutes)
|
||||
*/
|
||||
vacuumInterval?: number
|
||||
}
|
||||
) {
|
||||
this._filename = filename
|
||||
|
@ -289,6 +297,9 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
this._pendingUnimportant = {}
|
||||
}, params?.unimportantSavesDelay ?? 30000)
|
||||
|
||||
|
||||
this._vacuumInterval = params?.vacuumInterval ?? 300_000
|
||||
|
||||
// todo: add support for workers (idk if really needed, but still)
|
||||
}
|
||||
|
||||
|
@ -372,6 +383,11 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
}
|
||||
}
|
||||
|
||||
private _vacuum(): void {
|
||||
this._statements.delStaleState.run(Date.now())
|
||||
// local caches aren't cleared because it would be too expensive
|
||||
}
|
||||
|
||||
load(): void {
|
||||
this._db = sqlite3(this._filename, {
|
||||
verbose: debug.enabled ? debug : null,
|
||||
|
@ -396,6 +412,11 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
this._statements.updateCachedEnt.run(it)
|
||||
})
|
||||
})
|
||||
|
||||
this._vacuumTimeout = setInterval(
|
||||
this._vacuum.bind(this),
|
||||
this._vacuumInterval
|
||||
)
|
||||
}
|
||||
|
||||
save(): void {
|
||||
|
@ -409,6 +430,7 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
|
||||
destroy(): void {
|
||||
this._db.close()
|
||||
clearInterval(this._vacuumTimeout)
|
||||
}
|
||||
|
||||
reset(): void {
|
||||
|
@ -653,39 +675,39 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
// leaky bucket
|
||||
const now = Date.now()
|
||||
|
||||
let val: RateLimitItem | undefined = this._rlCache?.get(key)
|
||||
let val: FsmItem | undefined = this._rlCache?.get(key)
|
||||
const cached = val
|
||||
if (!val) {
|
||||
const got = this._statements.getState.get(`$rate_limit_${key}`)
|
||||
if (got) {
|
||||
val = JSON.parse(got.value)
|
||||
val = got
|
||||
}
|
||||
}
|
||||
|
||||
if (!val || val.res < now) {
|
||||
if (!val || val.expires! < now) {
|
||||
// expired or does not exist
|
||||
const state = {
|
||||
res: now + window * 1000,
|
||||
rem: limit
|
||||
const item: FsmItem = {
|
||||
expires: now + window * 1000,
|
||||
value: limit
|
||||
}
|
||||
|
||||
this._statements.setState.run(
|
||||
`$rate_limit_${key}`,
|
||||
JSON.stringify(state),
|
||||
null
|
||||
item.value,
|
||||
item.expires
|
||||
)
|
||||
this._rlCache?.set(key, state)
|
||||
this._rlCache?.set(key, item)
|
||||
|
||||
return [state.rem, state.res]
|
||||
return [item.value, item.expires!]
|
||||
}
|
||||
|
||||
if (val.rem > 0) {
|
||||
val.rem -= 1
|
||||
if (val.value > 0) {
|
||||
val.value -= 1
|
||||
|
||||
this._statements.setState.run(
|
||||
`$rate_limit_${key}`,
|
||||
JSON.stringify(val),
|
||||
null
|
||||
val.value,
|
||||
val.expires
|
||||
)
|
||||
if (!cached) {
|
||||
// add to cache
|
||||
|
@ -694,7 +716,7 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
|
|||
}
|
||||
}
|
||||
|
||||
return [val.rem, val.res]
|
||||
return [val.value, val.expires!]
|
||||
}
|
||||
|
||||
resetRateLimit(key: string): void {
|
||||
|
|
Loading…
Reference in a new issue