diff --git a/packages/core/src/storage/memory.ts b/packages/core/src/storage/memory.ts index 577f27a3..1fd2c037 100644 --- a/packages/core/src/storage/memory.ts +++ b/packages/core/src/storage/memory.ts @@ -27,8 +27,24 @@ interface MemorySessionState { // channel pts pts: Record - // state for fsm (v = value, e = expires) - fsm: Record + // state for fsm + fsm: Record< + string, + { + // value + v: any + // expires + e?: number + } + > + + // state for rate limiter + rl: Record self: ITelegramStorage.SelfInfo | null } @@ -69,6 +85,7 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ { gpts: null, pts: {}, fsm: {}, + rl: {}, self: null, } } @@ -277,4 +294,39 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ { deleteCurrentScene(key: string): void { delete this._state.fsm[`$current_scene_${key}`] } + + getRateLimit(key: string, limit: number, window: number): [number, number] { + // leaky bucket + const now = Date.now() + + if (!(key in this._state.rl)) { + const state = { + res: now + window * 1000, + rem: limit + } + + this._state.rl[key] = state + return [state.rem, state.res] + } + + const item = this._state.rl[key] + if (item.res < now) { + // expired + + const state = { + res: now + window * 1000, + rem: limit + } + + this._state.rl[key] = state + return [state.rem, state.res] + } + + item.rem = item.rem > 0 ? item.rem - 1 : 0 + return [item.rem, item.res] + } + + resetRateLimit(key: string): void { + delete this._state.rl[key] + } } diff --git a/packages/dispatcher/src/state/index.ts b/packages/dispatcher/src/state/index.ts index 959b600c..cd809aee 100644 --- a/packages/dispatcher/src/state/index.ts +++ b/packages/dispatcher/src/state/index.ts @@ -1,3 +1,3 @@ export { IStateStorage } from './storage' export { StateKeyDelegate, defaultStateKeyDelegate } from './key' -export { UpdateState } from './update-state' +export { UpdateState, RateLimitError } from './update-state' diff --git a/packages/dispatcher/src/state/storage.ts b/packages/dispatcher/src/state/storage.ts index 1f6db59e..180fc74d 100644 --- a/packages/dispatcher/src/state/storage.ts +++ b/packages/dispatcher/src/state/storage.ts @@ -58,4 +58,25 @@ export interface IStateStorage { * @param key Key of the state, as defined by {@link StateKeyDelegate} */ deleteCurrentScene(key: string): MaybeAsync + + /** + * Get information about a rate limit. + * + * It is recommended that you use sliding window or leaky bucket + * to implement rate limiting ([learn more](https://konghq.com/blog/how-to-design-a-scalable-rate-limiting-algorithm/)), + * + * @param key Key of the rate limit + * @param limit Maximum number of requests in `window` + * @param window Window size in seconds + * @returns Tuple containing the number of remaining and + * unix time in ms when the user can try again + */ + getRateLimit(key: string, limit: number, window: number): MaybeAsync<[number, number]> + + /** + * Reset a rate limit. + * + * @param key Key of the rate limit + */ + resetRateLimit(key: string): MaybeAsync } diff --git a/packages/dispatcher/src/state/update-state.ts b/packages/dispatcher/src/state/update-state.ts index 16712ce4..7a416a42 100644 --- a/packages/dispatcher/src/state/update-state.ts +++ b/packages/dispatcher/src/state/update-state.ts @@ -1,4 +1,14 @@ import { IStateStorage } from './storage' +import { MtCuteError } from '@mtcute/client' + +/** + * Error thrown by `.throttle()` + */ +export class RateLimitError extends MtCuteError { + constructor (readonly reset: number) { + super(`You are being rate limited.`) + } +} /** * State of the current update. @@ -159,4 +169,35 @@ export class UpdateState { this._updateLocalKey() await this._storage.deleteCurrentScene(this._key) } + + /** + * Rate limit some handler + * + * > **Note**: `key` is used to prefix the local key + * > derived using the given key delegate. + * + * @param key Key of the rate limit + * @param limit Maximum number of requests in `window` + * @param window Window size in seconds + * @returns Tuple containing the number of remaining and + * unix time in ms when the user can try again + */ + async throttle(key: string, limit: number, window: number): Promise<[number, number]> { + const [remaining, reset] = await this._localStorage.getRateLimit(`${key}:${this._localKey}`, limit, window) + + if (!remaining) { + throw new RateLimitError(reset) + } + + return [remaining - 1, reset] + } + + /** + * Reset the rate limit + * + * @param key Key of the rate limit + */ + async resetRateLimit(key: string): Promise { + await this._localStorage.resetRateLimit(`${key}:${this._localKey}`) + } } diff --git a/packages/sqlite/index.ts b/packages/sqlite/index.ts index 5b5e50ba..0cededc6 100644 --- a/packages/sqlite/index.ts +++ b/packages/sqlite/index.ts @@ -124,6 +124,13 @@ interface FsmItem { expires?: number } +interface RateLimitItem { + // reset + res: number + // remaining + rem: number +} + const STATEMENTS = { getKv: 'select value from kv where key = ?', setKv: 'insert or replace into kv (key, value) values (?, ?)', @@ -168,6 +175,7 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { private _cache?: LruMap private _fsmCache?: LruMap + private _rlCache?: LruMap private _wal?: boolean @@ -210,6 +218,19 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { */ fsmCacheSize?: number + /** + * Rate limit states cache size, in number of keys. + * + * Recently created/used rate limits are cached + * in memory to avoid redundant database calls. + * If you are having problems with this (e.g. stale + * state in case of concurrent accesses), you + * can disable this by passing `0` + * + * Defaults to `100` + */ + rlCacheSize?: number + /** * By default, WAL mode is enabled, which * significantly improves performance. @@ -244,6 +265,10 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { this._fsmCache = new LruMap(params?.fsmCacheSize ?? 100) } + if (params?.rlCacheSize !== 0) { + this._rlCache = new LruMap(params?.rlCacheSize ?? 100) + } + this._wal = !params?.disableWal this._saveUnimportantLater = throttle(() => { @@ -623,4 +648,57 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ { deleteCurrentScene(key: string): void { this.deleteState(`$current_scene_${key}`) } + + getRateLimit(key: string, limit: number, window: number): [number, number] { + // leaky bucket + const now = Date.now() + + let val: RateLimitItem | undefined = this._rlCache?.get(key) + const cached = val + if (!val) { + const got = this._statements.getState.get(`$rate_limit_${key}`) + if (got) { + val = JSON.parse(got.value) + } + } + + if (!val || val.res < now) { + // expired or does not exist + const state = { + res: now + window * 1000, + rem: limit + } + + this._statements.setState.run( + `$rate_limit_${key}`, + JSON.stringify(state), + null + ) + this._rlCache?.set(key, state) + + return [state.rem, state.res] + } + + if (val.rem > 0) { + val.rem -= 1 + + this._statements.setState.run( + `$rate_limit_${key}`, + JSON.stringify(val), + null + ) + if (!cached) { + // add to cache + // if cached, cache is updated since `val === cached` + this._rlCache?.set(key, val) + } + } + + return [val.rem, val.res] + } + + resetRateLimit(key: string): void { + this._rlCache?.delete(key) + this._statements.delState.run(`$rate_limit_${key}`) + } }