diff --git a/packages/core/src/network/client.ts b/packages/core/src/network/client.ts index e59c93b1..f60ff5aa 100644 --- a/packages/core/src/network/client.ts +++ b/packages/core/src/network/client.ts @@ -103,24 +103,6 @@ export interface MtClientOptions { */ reconnectionStrategy?: ReconnectionStrategy - /** - * Maximum duration of a flood_wait that will be waited automatically. - * Flood waits above this threshold will throw a FloodWaitError. - * Set to 0 to disable. Can be overridden with `throwFlood` parameter in call() params - * - * @default 10000 - */ - floodSleepThreshold?: number - - /** - * Maximum number of retries when calling RPC methods. - * Call is retried when InternalError or FloodWaitError is encountered. - * Can be set to Infinity. - * - * @default 5 - */ - maxRetryCount?: number - /** * If true, all API calls will be wrapped with `tl.invokeWithoutUpdates`, * effectively disabling the server-sent events for the clients. @@ -287,8 +269,6 @@ export class MtClient extends EventEmitter { testMode: Boolean(params.testMode), transport: params.transport, emitError: this.emitError.bind(this), - floodSleepThreshold: params.floodSleepThreshold ?? 10000, - maxRetryCount: params.maxRetryCount ?? 5, isPremium: false, useIpv6: Boolean(params.useIpv6), enableErrorReporting: params.enableErrorReporting ?? false, diff --git a/packages/core/src/network/middlewares/bundle.ts b/packages/core/src/network/middlewares/bundle.ts index 00f48163..67968f23 100644 --- a/packages/core/src/network/middlewares/bundle.ts +++ b/packages/core/src/network/middlewares/bundle.ts @@ -1,2 +1,3 @@ +export * from './default.js' export * from './on-error.js' export * from './on-method.js' diff --git a/packages/core/src/network/middlewares/default.ts b/packages/core/src/network/middlewares/default.ts new file mode 100644 index 00000000..6a904910 --- /dev/null +++ b/packages/core/src/network/middlewares/default.ts @@ -0,0 +1,12 @@ +import { RpcCallMiddleware } from '../network-manager.js' +import { floodWaiter, FloodWaiterOptions } from './flood-waiter.js' +import { internalErrorsHandler, InternalErrorsHandlerOptions } from './internal-errors.js' + +export interface BasicMiddlewaresOptions { + floodWaiter?: FloodWaiterOptions + internalErrors?: InternalErrorsHandlerOptions +} + +export const basic = (options?: BasicMiddlewaresOptions): RpcCallMiddleware[] => { + return [floodWaiter(options?.floodWaiter ?? {}), internalErrorsHandler(options?.internalErrors ?? {})] +} diff --git a/packages/core/src/network/middlewares/flood-waiter.ts b/packages/core/src/network/middlewares/flood-waiter.ts new file mode 100644 index 00000000..e4bed0da --- /dev/null +++ b/packages/core/src/network/middlewares/flood-waiter.ts @@ -0,0 +1,128 @@ +import { mtp } from '@mtcute/tl' + +import { combineAbortSignals } from '../../utils/abort-signal.js' +import { sleepWithAbort } from '../../utils/misc-utils.js' +import { isTlRpcError } from '../../utils/type-assertions.js' +import { RpcCallMiddleware } from '../network-manager.js' + +export interface FloodWaiterOptions { + /** + * Maximum number of ms to wait when a FLOOD_WAIT_X + * error is encountered. If the wait time is greater + * than this value, the request will throw an error instead + * + * This can be overwritten on a per-request basis by setting + * `floodSleepThreshold` in the request parameters + * + * @default 10_000 + */ + maxWait?: number + + /** + * Maximum number of retries to perform when a FLOOD_WAIT_X + * error is encountered. After this number of retries, the + * last error will be thrown + * + * @default 5 + */ + maxRetries?: number + + /** + * Whether to store the last flood wait time and delay + * the consecutive requests accordingly + * + * @default true + */ + store?: boolean + + /** + * If the stored wait time is less than this value, + * the request will not be delayed + * + * @default 3_000 + */ + minStoredWait?: number +} + +export function floodWaiter(options: FloodWaiterOptions): RpcCallMiddleware { + const { maxWait = 10_000, maxRetries = 5, store = true, minStoredWait = 3_000 } = options + + const storage = new Map() + + return async (ctx, next) => { + // do not send requests that are in flood wait + const method = ctx.request._ + const storedWaitUntil = store ? storage.get(method) : undefined + const floodSleepThreshold = ctx.params?.floodSleepThreshold ?? maxWait + + if (storedWaitUntil !== undefined) { + const delta = storedWaitUntil - Date.now() + + if (delta <= minStoredWait) { + // flood waits below 3 seconds are "ignored" + storage.delete(method) + } else if (delta <= floodSleepThreshold) { + await sleepWithAbort(delta, combineAbortSignals(ctx.manager.params.stopSignal, ctx.params?.abortSignal)) + storage.delete(method) + } else { + return { + _: 'mt_rpc_error', + errorCode: 420, + errorMessage: `FLOOD_WAIT_${Math.ceil(delta / 1000)}`, + } satisfies mtp.RawMt_rpc_error + } + } + + let lastError: mtp.RawMt_rpc_error | undefined + + for (let i = 0; i <= maxRetries; i++) { + const res = await next(ctx) + + if (!isTlRpcError(res)) { + return res + } + + lastError = res + + const err = res.errorMessage + + if ( + err.startsWith('FLOOD_WAIT_') || + err.startsWith('SLOWMODE_WAIT_') || + err.startsWith('FLOOD_TEST_PHONE_WAIT_') + ) { + let seconds = Number(err.slice(err.lastIndexOf('_') + 1)) + + if (Number.isNaN(seconds)) { + ctx.manager._log.warn('invalid flood wait error received: %s, ignoring', err) + + return res + } + + if (store && !err.startsWith('SLOWMODE_WAIT_')) { + // SLOW_MODE_WAIT is per-chat, not per-request + storage.set(method, Date.now() + seconds * 1000) + } + + // In test servers, FLOOD_WAIT_0 has been observed, and sleeping for + // such a short amount will cause retries very fast leading to issues + if (seconds === 0) { + seconds = 1 + } + + const ms = seconds * 1000 + + if (ms <= floodSleepThreshold) { + ctx.manager._log.warn('%s resulted in a flood wait, will retry in %d seconds', method, seconds) + await sleepWithAbort( + ms, + combineAbortSignals(ctx.manager.params.stopSignal, ctx.params?.abortSignal), + ) + continue + } + } + } + + return lastError + } +} diff --git a/packages/core/src/network/middlewares/internal-errors.ts b/packages/core/src/network/middlewares/internal-errors.ts new file mode 100644 index 00000000..3b0e31ea --- /dev/null +++ b/packages/core/src/network/middlewares/internal-errors.ts @@ -0,0 +1,71 @@ +import { tl } from '@mtcute/tl' + +import { MtTimeoutError } from '../../types/errors.js' +import { combineAbortSignals } from '../../utils/abort-signal.js' +import { sleepWithAbort } from '../../utils/misc-utils.js' +import { isTlRpcError } from '../../utils/type-assertions.js' +import { RpcCallMiddleware } from '../network-manager.js' + +const CLIENT_ERRORS = /* #__PURE__ */ new Set([ + tl.RpcError.BAD_REQUEST, + tl.RpcError.UNAUTHORIZED, + tl.RpcError.FORBIDDEN, + tl.RpcError.NOT_FOUND, + tl.RpcError.FLOOD, + tl.RpcError.SEE_OTHER, + tl.RpcError.NOT_ACCEPTABLE, +]) + +export interface InternalErrorsHandlerOptions { + /** + * Maximum number of retries to perform when an internal server error is encountered. + * + * @default Infinity + */ + maxRetries?: number + + /** + * Number of seconds to wait before retrying + * when an internal server error is encountered. + * + * @default 1 + */ + waitTime?: number +} + +export function internalErrorsHandler(params: InternalErrorsHandlerOptions): RpcCallMiddleware { + const { maxRetries = Infinity, waitTime = 1 } = params + + return async (ctx, next) => { + const numRetries = ctx.params?.maxRetryCount ?? maxRetries + + for (let i = 0; i <= numRetries; i++) { + const res = await next(ctx) + if (!isTlRpcError(res)) return res + + if (!CLIENT_ERRORS.has(res.errorCode)) { + if (ctx.params?.throw503 && res.errorCode === -503) { + throw new MtTimeoutError() + } + + const waitSeconds = res.errorMessage === 'WORKER_BUSY_TOO_LONG_RETRY' ? Math.max(1, waitTime) : waitTime + + ctx.manager._log.warn( + 'Telegram is having internal issues: %d:%s, retrying in %ds', + res.errorCode, + res.errorMessage, + waitSeconds, + ) + + if (waitSeconds > 0) { + await sleepWithAbort( + waitSeconds * 1000, + combineAbortSignals(ctx.manager.params.stopSignal, ctx.params?.abortSignal), + ) + } + } + + return res + } + } +} diff --git a/packages/core/src/network/network-manager.ts b/packages/core/src/network/network-manager.ts index 33765100..c1c2b84a 100644 --- a/packages/core/src/network/network-manager.ts +++ b/packages/core/src/network/network-manager.ts @@ -3,18 +3,12 @@ import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' import { getPlatform } from '../platform.js' import { StorageManager } from '../storage/storage.js' -import { MtArgumentError, MtcuteError, MtTimeoutError, MtUnsupportedError } from '../types/index.js' +import { MtArgumentError, MtcuteError, MtUnsupportedError } from '../types/index.js' import { ComposedMiddleware, composeMiddlewares, Middleware } from '../utils/composer.js' -import { - ControllablePromise, - createControllablePromise, - DcOptions, - ICryptoProvider, - Logger, - sleepWithAbort, -} from '../utils/index.js' +import { ControllablePromise, createControllablePromise, DcOptions, ICryptoProvider, Logger } from '../utils/index.js' import { assertTypeIs, isTlRpcError } from '../utils/type-assertions.js' import { ConfigManager } from './config-manager.js' +import { basic as defaultMiddlewares } from './middlewares/default.js' import { MultiSessionConnection } from './multi-session-connection.js' import { PersistentConnectionParams } from './persistent-connection.js' import { defaultReconnectionStrategy, ReconnectionStrategy } from './reconnection.js' @@ -24,16 +18,6 @@ import { TransportFactory } from './transports/index.js' export type ConnectionKind = 'main' | 'upload' | 'download' | 'downloadSmall' -const CLIENT_ERRORS = { - [tl.RpcError.BAD_REQUEST]: 1, - [tl.RpcError.UNAUTHORIZED]: 1, - [tl.RpcError.FORBIDDEN]: 1, - [tl.RpcError.NOT_FOUND]: 1, - [tl.RpcError.FLOOD]: 1, - [tl.RpcError.SEE_OTHER]: 1, - [tl.RpcError.NOT_ACCEPTABLE]: 1, -} - /** * Params passed into {@link NetworkManager} by {@link TelegramClient}. * This type is intended for internal usage only. @@ -48,8 +32,6 @@ export interface NetworkManagerParams { initConnectionOptions?: Partial> transport: TransportFactory reconnectionStrategy?: ReconnectionStrategy - floodSleepThreshold: number - maxRetryCount: number disableUpdates?: boolean testMode: boolean layer: number @@ -132,7 +114,7 @@ export interface RpcCallOptions { * * If set to `0`, the call will not be retried. * - * @default {@link BaseTelegramClientOptions.floodSleepThreshold} + * Only applies when the flood waiter middleware is enabled. */ floodSleepThreshold?: number @@ -140,7 +122,8 @@ export interface RpcCallOptions { * If the call results in an internal server error or a flood wait, * the maximum amount of times to retry the call. * - * @default {@link BaseTelegramClientOptions.maxRetryCount} + * Only applies when the flood waiter middleware and/or + * internal errors handler middleware is enabled. */ maxRetryCount?: number @@ -185,6 +168,9 @@ export interface RpcCallOptions { * * Useful for methods like `messages.getBotCallbackAnswer` that reliably return * -503 in case the upstream bot failed to respond. + * + * Only applies if the internal error handler middleware is enabled, + * otherwise -503 is always thrown. */ throw503?: boolean @@ -776,7 +762,11 @@ export class NetworkManager { ) => Promise private _composeCall = (middlewares?: Middleware[]) => { - if (!middlewares?.length) { + if (!middlewares) { + middlewares = defaultMiddlewares() + } + + if (!middlewares.length) { return this._call } @@ -793,7 +783,6 @@ export class NetworkManager { }) } - private _floodWaitedRequests = new Map() private _call = async ( message: T, params?: RpcCallOptions, @@ -802,29 +791,6 @@ export class NetworkManager { throw new MtcuteError('Not connected to any DC') } - const floodSleepThreshold = params?.floodSleepThreshold ?? this.params.floodSleepThreshold - const maxRetryCount = params?.maxRetryCount ?? this.params.maxRetryCount - const throw503 = params?.throw503 ?? false - - // do not send requests that are in flood wait - if (this._floodWaitedRequests.has(message._)) { - const delta = this._floodWaitedRequests.get(message._)! - Date.now() - - if (delta <= 3000) { - // flood waits below 3 seconds are "ignored" - this._floodWaitedRequests.delete(message._) - } else if (delta <= this.params.floodSleepThreshold) { - await sleepWithAbort(delta, this.params.stopSignal) - this._floodWaitedRequests.delete(message._) - } else { - const err = tl.RpcError.create(tl.RpcError.FLOOD, 'FLOOD_WAIT_%d') - err.seconds = Math.ceil(delta / 1000) - throw err - } - } - - let lastError: mtp.RawMt_rpc_error | null = null - const kind = params?.kind ?? 'main' let manager: DcConnectionManager @@ -838,102 +804,53 @@ export class NetworkManager { let multi = manager[kind] - for (let i = 0; i < maxRetryCount; i++) { - const res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId) + let res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId) - if (!isTlRpcError(res)) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return res - } + if (!isTlRpcError(res)) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return res + } - lastError = res - - const err = res.errorMessage - - if (!(res.errorCode in CLIENT_ERRORS)) { - if (throw503 && res.errorCode === -503) { - throw new MtTimeoutError() - } - - this._log.warn('Telegram is having internal issues: %d:%s, retrying', res.errorCode, err) - - if (err === 'WORKER_BUSY_TOO_LONG_RETRY') { - // according to tdlib, "it is dangerous to resend query without timeout, so use 1" - await sleepWithAbort(1000, this.params.stopSignal) - } - continue - } + const err = res.errorMessage + if (manager === this._primaryDc) { if ( - err.startsWith('FLOOD_WAIT_') || - err.startsWith('SLOWMODE_WAIT_') || - err.startsWith('FLOOD_TEST_PHONE_WAIT_') + err.startsWith('PHONE_MIGRATE_') || + err.startsWith('NETWORK_MIGRATE_') || + err.startsWith('USER_MIGRATE_') ) { - let seconds = Number(err.lastIndexOf('_') + 1) + const newDc = Number(err.slice(err.lastIndexOf('_') + 1)) - if (Number.isNaN(seconds)) { - this._log.warn('invalid flood wait error received: %s, ignoring', err) + if (Number.isNaN(newDc)) { + this._log.warn('invalid migrate error received: %s, ignoring', err) return res } - if (!err.startsWith('SLOWMODE_WAIT_')) { - // SLOW_MODE_WAIT is chat-specific, not request-specific - this._floodWaitedRequests.set(message._, Date.now() + seconds * 1000) + if (params?.localMigrate) { + manager = await this._getOtherDc(newDc) + } else { + this._log.info('received %s, migrating to dc %d', err, newDc) + + await this.changePrimaryDc(newDc) + manager = this._primaryDc! } - // In test servers, FLOOD_WAIT_0 has been observed, and sleeping for - // such a short amount will cause retries very fast leading to issues - if (seconds === 0) { - seconds = 1 - } + multi = manager[kind] - if (seconds <= floodSleepThreshold) { - this._log.warn('%s resulted in a flood wait, will retry in %d seconds', message._, seconds) - await sleepWithAbort(seconds * 1000, this.params.stopSignal) - continue - } + res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId) } + } else if (err === 'AUTH_KEY_UNREGISTERED') { + // we can try re-exporting auth from the primary connection + this._log.warn('exported auth key error, trying re-exporting..') - if (manager === this._primaryDc) { - if ( - err.startsWith('PHONE_MIGRATE_') || - err.startsWith('NETWORK_MIGRATE_') || - err.startsWith('USER_MIGRATE_') - ) { - const newDc = Number(err.slice(err.lastIndexOf('_') + 1)) + await this._exportAuthTo(manager) - if (Number.isNaN(newDc)) { - this._log.warn('invalid migrate error received: %s, ignoring', err) - - return res - } - - if (params?.localMigrate) { - manager = await this._getOtherDc(newDc) - } else { - this._log.info('received %s, migrating to dc %d', err, newDc) - - await this.changePrimaryDc(newDc) - manager = this._primaryDc! - } - - multi = manager[kind] - - continue - } - } else if (err === 'AUTH_KEY_UNREGISTERED') { - // we can try re-exporting auth from the primary connection - this._log.warn('exported auth key error, trying re-exporting..') - - await this._exportAuthTo(manager) - continue - } - - return res + res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId) } - return lastError! + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return res } changeTransport(factory: TransportFactory): void { diff --git a/packages/core/src/utils/abort-signal.ts b/packages/core/src/utils/abort-signal.ts new file mode 100644 index 00000000..876d4c0c --- /dev/null +++ b/packages/core/src/utils/abort-signal.ts @@ -0,0 +1,10 @@ +export function combineAbortSignals(signal1: AbortSignal, signal2?: AbortSignal): AbortSignal { + if (!signal2) return signal1 + + const controller = new AbortController() + + signal1.addEventListener('abort', () => controller.abort()) + signal2.addEventListener('abort', () => controller.abort()) + + return controller.signal +}