From c5438a2f2978fee57dcf5987e027cfc2c5a6b527 Mon Sep 17 00:00:00 2001 From: alina sireneva Date: Sun, 7 Jul 2024 03:06:12 +0300 Subject: [PATCH 1/2] feat(core): outgoing request middlewares --- packages/core/src/highlevel/base.ts | 10 +- packages/core/src/network/index.ts | 9 +- .../core/src/network/middlewares/bundle.ts | 2 + .../core/src/network/middlewares/index.ts | 3 + .../core/src/network/middlewares/on-error.ts | 49 +++++ .../core/src/network/middlewares/on-method.ts | 54 ++++++ packages/core/src/network/network-manager.ts | 49 ++++- packages/core/src/utils/composer.test.ts | 179 ++++++++++++++++++ packages/core/src/utils/composer.ts | 30 +++ packages/core/src/utils/index.ts | 1 + 10 files changed, 377 insertions(+), 9 deletions(-) create mode 100644 packages/core/src/network/middlewares/bundle.ts create mode 100644 packages/core/src/network/middlewares/index.ts create mode 100644 packages/core/src/network/middlewares/on-error.ts create mode 100644 packages/core/src/network/middlewares/on-method.ts create mode 100644 packages/core/src/utils/composer.test.ts create mode 100644 packages/core/src/utils/composer.ts diff --git a/packages/core/src/highlevel/base.ts b/packages/core/src/highlevel/base.ts index c507195a..f6708d9a 100644 --- a/packages/core/src/highlevel/base.ts +++ b/packages/core/src/highlevel/base.ts @@ -11,7 +11,9 @@ import { asyncResettable, computeNewPasswordHash, computeSrpParams, + ICryptoProvider, isTlRpcError, + Logger, readStringSession, StringSessionData, writeStringSession, @@ -45,10 +47,10 @@ export class BaseTelegramClient implements ITelegramClient { private _serverUpdatesHandler: ServerUpdateHandler = () => {} private _connectionStateHandler: (state: ConnectionState) => void = () => {} - readonly log - readonly mt - readonly crypto - readonly storage + readonly log: Logger + readonly mt: MtClient + readonly crypto: ICryptoProvider + readonly storage: TelegramStorageManager constructor(readonly params: BaseTelegramClientOptions) { this.log = this.params.logger ?? new LogManager('client') diff --git a/packages/core/src/network/index.ts b/packages/core/src/network/index.ts index a1dba3f2..abd57885 100644 --- a/packages/core/src/network/index.ts +++ b/packages/core/src/network/index.ts @@ -1,5 +1,12 @@ export * from './client.js' -export type { ConnectionKind, NetworkManagerExtraParams, RpcCallOptions } from './network-manager.js' +export * from './middlewares/index.js' +export type { + ConnectionKind, + NetworkManagerExtraParams, + RpcCallMiddleware, + RpcCallMiddlewareContext, + RpcCallOptions, +} from './network-manager.js' export * from './reconnection.js' export * from './session-connection.js' export * from './transports/index.js' diff --git a/packages/core/src/network/middlewares/bundle.ts b/packages/core/src/network/middlewares/bundle.ts new file mode 100644 index 00000000..00f48163 --- /dev/null +++ b/packages/core/src/network/middlewares/bundle.ts @@ -0,0 +1,2 @@ +export * from './on-error.js' +export * from './on-method.js' diff --git a/packages/core/src/network/middlewares/index.ts b/packages/core/src/network/middlewares/index.ts new file mode 100644 index 00000000..0954b25f --- /dev/null +++ b/packages/core/src/network/middlewares/index.ts @@ -0,0 +1,3 @@ +import * as networkMiddlewares from './bundle.js' + +export { networkMiddlewares } diff --git a/packages/core/src/network/middlewares/on-error.ts b/packages/core/src/network/middlewares/on-error.ts new file mode 100644 index 00000000..07c5585f --- /dev/null +++ b/packages/core/src/network/middlewares/on-error.ts @@ -0,0 +1,49 @@ +import { mtp } from '@mtcute/tl' + +import { MaybePromise } from '../../types/utils.js' +import { isTlRpcError } from '../../utils/type-assertions.js' +import { RpcCallMiddleware, RpcCallMiddlewareContext } from '../network-manager.js' + +/** + * Middleware that will call `handler` whenever an RPC error happens, + * with the error object itself. + * + * The handler can either return nothing + * (in which case the original error will be thrown), a new error + * (via the `_: 'mt_rpc_error'` object), or any other value, which + * will be returned as the result of the RPC call. + * + * Note that the return value is **not type-checked** + * due to limitations of TypeScript. You'll probably want to use `satisfies` + * keyword to ensure the return value is correct, for example: + * + * ```ts + * networkMiddlewares.onRpcError(async (ctx, error) => { + * if (rpc.request._ === 'help.getNearestDc') { + * return { + * _: 'nearestDc', + * country: 'RU', + * thisDc: 2, + * nearestDc: 2, + * } satisfies tl.RpcCallReturn['help.getNearestDc'] + * } + * }) + * ``` + */ +export function onRpcError( + handler: (ctx: RpcCallMiddlewareContext, error: mtp.RawMt_rpc_error) => MaybePromise, +): RpcCallMiddleware { + return async (ctx, next) => { + let res = await next(ctx) + + if (isTlRpcError(res)) { + const handlerRes = await handler(ctx, res) + + if (handlerRes !== undefined) { + res = handlerRes + } + } + + return res + } +} diff --git a/packages/core/src/network/middlewares/on-method.ts b/packages/core/src/network/middlewares/on-method.ts new file mode 100644 index 00000000..b8430ad4 --- /dev/null +++ b/packages/core/src/network/middlewares/on-method.ts @@ -0,0 +1,54 @@ +import { tl } from '@mtcute/tl' + +import { Middleware } from '../../utils/composer.js' +import { RpcCallMiddleware, RpcCallMiddlewareContext } from '../network-manager.js' + +/** + * Middleware that will call `handler` whenever `method` RPC method is called. + * + * This helper exists due to TypeScript limitations not allowing us to + * properly type the return type without explicit type annotations, + * for a bit more type-safe and clean code: + * + * ```ts + * // before + * async (ctx, next) => { + * if (rpc.request._ === 'help.getNearestDc') { + * return { + * _: 'nearestDc', + * country: 'RU', + * thisDc: 2, + * nearestDc: 2, + * } satisfies tl.RpcCallReturn['help.getNearestDc'] + * } + * + * return next(ctx) + * } + * + * // after + * onMethod('help.getNearestDc', async () => ({ + * _: 'nearestDc' as const, // (otherwise ts will infer this as `string` and will complain) + * country: 'RU', + * thisDc: 2, + * nearestDc: 2, + * }) + * ``` + */ +export function onMethod( + method: T, + middleware: Middleware< + Omit & { + request: Extract + }, + tl.RpcCallReturn[T] + >, +): RpcCallMiddleware { + return async (ctx, next) => { + if (ctx.request._ !== method) { + return next(ctx) + } + + // eslint-disable-next-line + return middleware(ctx as any, next) + } +} diff --git a/packages/core/src/network/network-manager.ts b/packages/core/src/network/network-manager.ts index 59e3640e..33765100 100644 --- a/packages/core/src/network/network-manager.ts +++ b/packages/core/src/network/network-manager.ts @@ -4,6 +4,7 @@ import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' import { getPlatform } from '../platform.js' import { StorageManager } from '../storage/storage.js' import { MtArgumentError, MtcuteError, MtTimeoutError, MtUnsupportedError } from '../types/index.js' +import { ComposedMiddleware, composeMiddlewares, Middleware } from '../utils/composer.js' import { ControllablePromise, createControllablePromise, @@ -113,6 +114,14 @@ export interface NetworkManagerExtraParams { * @default 60000 (60 seconds). */ inactivityTimeout?: number + + /** + * List of middlewares to use for the network manager + * + * > **Note**: these middlewares apply to **outgoing requests only**. + * > If you need to handle incoming updates, use a {@link Dispatcher} instead. + */ + middlewares?: Middleware[] } /** Options that can be customized when making an RPC call */ @@ -198,6 +207,13 @@ export interface RpcCallOptions { chainId?: string | number } +export interface RpcCallMiddlewareContext { + request: tl.RpcMethod + manager: NetworkManager + params?: RpcCallOptions +} +export type RpcCallMiddleware = Middleware + /** * Wrapper over all connection pools for a single DC. */ @@ -501,6 +517,8 @@ export class NetworkManager { this._connectionCount = params.connectionCount ?? defaultConnectionCountDelegate this._updateHandler = params.onUpdate + this.call = this._composeCall(params.middlewares) + this._onConfigChanged = this._onConfigChanged.bind(this) config.onReload(this._onConfigChanged) @@ -752,11 +770,34 @@ export class NetworkManager { await this._switchPrimaryDc(this._dcConnections.get(newDc)!) } - private _floodWaitedRequests = new Map() - async call( + readonly call: ( message: T, params?: RpcCallOptions, - ): Promise { + ) => Promise + + private _composeCall = (middlewares?: Middleware[]) => { + if (!middlewares?.length) { + return this._call + } + + const final: ComposedMiddleware = async (ctx) => { + return this._call(ctx.request, ctx.params) + } + const composed = composeMiddlewares(middlewares, final) + + return async (message: T, params?: RpcCallOptions): Promise => + composed({ + request: message, + manager: this, + params, + }) + } + + private _floodWaitedRequests = new Map() + private _call = async ( + message: T, + params?: RpcCallOptions, + ): Promise => { if (!this._primaryDc) { throw new MtcuteError('Not connected to any DC') } @@ -871,7 +912,7 @@ export class NetworkManager { if (params?.localMigrate) { manager = await this._getOtherDc(newDc) } else { - this._log.info('Migrate error, new dc = %d', newDc) + this._log.info('received %s, migrating to dc %d', err, newDc) await this.changePrimaryDc(newDc) manager = this._primaryDc! diff --git a/packages/core/src/utils/composer.test.ts b/packages/core/src/utils/composer.test.ts new file mode 100644 index 00000000..9fa79ad4 --- /dev/null +++ b/packages/core/src/utils/composer.test.ts @@ -0,0 +1,179 @@ +/* eslint-disable @typescript-eslint/require-await */ +import { describe, expect, it } from 'vitest' + +import { composeMiddlewares, Middleware } from './composer.js' + +describe('composeMiddlewares', () => { + it('should compose middlewares', async () => { + const trace: unknown[] = [] + const middlewares: Middleware[] = [ + async (ctx, next) => { + trace.push(ctx) + trace.push(1) + await next([...ctx, 1]) + trace.push(6) + }, + async (ctx, next) => { + trace.push(ctx) + trace.push(2) + await next([...ctx, 2]) + trace.push(5) + }, + async (ctx, next) => { + trace.push(ctx) + trace.push(3) + await next([...ctx, 3]) + trace.push(4) + }, + ] + + const composed = composeMiddlewares(middlewares, async (res) => { + result = res + }) + + let result: readonly number[] = [] + await composed([]) + + expect(trace).toEqual([[], 1, [1], 2, [1, 2], 3, 4, 5, 6]) + expect(result).toEqual([1, 2, 3]) + }) + + it('should handle multiple calls to final', async () => { + const trace: unknown[] = [] + + const middlewares: Middleware[] = [ + async (ctx, next) => { + trace.push(1) + await next([2]) + trace.push(3) + await next([4]) + trace.push(5) + }, + ] + + const composed = composeMiddlewares(middlewares, async (res) => { + trace.push(res) + }) + + await composed([]) + + expect(trace).toEqual([1, [2], 3, [4], 5]) + }) + + it('should handle multiple calls to next midway', async () => { + const trace: unknown[] = [] + + const middlewares: Middleware[] = [ + async (ctx, next) => { + trace.push(1) + await next([2]) + trace.push(3) + await next([4]) + trace.push(5) + }, + (ctx, next) => next([6, ...ctx]), + ] + + const composed = composeMiddlewares(middlewares, async (res) => { + trace.push(res) + }) + + await composed([]) + + expect(trace).toEqual([1, [6, 2], 3, [6, 4], 5]) + }) + + it('should handle leaf middleware', async () => { + const trace: unknown[] = [] + + const middlewares: Middleware[] = [ + async (ctx, next) => { + trace.push(1) + + return next(ctx) + }, + async () => { + /* do nothing */ + }, + ] + + const composed = composeMiddlewares(middlewares, async (res) => { + trace.push(res) // should not be called + }) + + await composed([]) + + expect(trace).toEqual([1]) + }) + + it('should propagate return value', async () => { + const trace: unknown[] = [] + + const middlewares: Middleware[] = [ + async (ctx, next) => { + trace.push(1) + const res = await next([2]) + trace.push(3) + const res2 = await next([3, 4, 5]) + trace.push(6) + + return res + res2 + }, + async (ctx, next) => { + trace.push(-1) + + return (await next(ctx)) + 1 + }, + ] + + const composed = composeMiddlewares(middlewares, async (res) => { + trace.push(res) + + return res.length + }) + + const result = await composed([]) + + expect(trace).toEqual([1, -1, [2], 3, -1, [3, 4, 5], 6]) + expect(result).toBe(6) + }) + + it('should propagate errors', async () => { + const trace: unknown[] = [] + + const middlewares: Middleware[] = [ + async (ctx, next) => { + trace.push(1) + + try { + await next(2) + } catch (e) { + trace.push('caught error') + } + + trace.push(3) + await next(4) + trace.push(5) + }, + (ctx, next) => next(ctx), // pass-thru + async (ctx, next) => { + if (ctx === 2) { + trace.push('error') + throw new Error('error') + } else { + trace.push('ok') + + return next(ctx) + } + }, + ] + + const composed = composeMiddlewares(middlewares, async (res) => { + trace.push(`final ${res}`) + }) + + await composed(0) + + expect(trace).toEqual([1, 'error', 'caught error', 3, 'ok', 'final 4', 5]) + }) +}) diff --git a/packages/core/src/utils/composer.ts b/packages/core/src/utils/composer.ts new file mode 100644 index 00000000..5c4037b9 --- /dev/null +++ b/packages/core/src/utils/composer.ts @@ -0,0 +1,30 @@ +export type Middleware = ( + ctx: Context, + next: (ctx: Context) => Promise, +) => Promise +export type ComposedMiddleware = (ctx: Context) => Promise + +export function composeMiddlewares( + middlewares: Middleware[], + final: ComposedMiddleware, +): ComposedMiddleware { + middlewares = middlewares.slice() + middlewares.push(final) + + function dispatch(i: number, ctx: Context): Promise { + const fn = middlewares[i] + if (!fn) return final(ctx) + + return fn(ctx, boundDispatches[i + 1]) + } + + const boundDispatches: Array<(ctx: Context) => Promise> = [] + + for (let i = 0; i < middlewares.length; i++) { + boundDispatches.push(dispatch.bind(null, i)) + } + + return function (context: Context): Promise { + return boundDispatches[0](context) + } +} diff --git a/packages/core/src/utils/index.ts b/packages/core/src/utils/index.ts index 2a9c6b56..f3b0c972 100644 --- a/packages/core/src/utils/index.ts +++ b/packages/core/src/utils/index.ts @@ -8,6 +8,7 @@ export * from '../storage/service/default-dcs.js' export * from './async-lock.js' export * from './bigint-utils.js' export * from './buffer-utils.js' +export * from './composer.js' export * from './condition-variable.js' export * from './controllable-promise.js' export * from './crypto/index.js' From bb7baf263b8fac2e9bff6cb7c992b4b98478787f Mon Sep 17 00:00:00 2001 From: alina sireneva Date: Tue, 16 Jul 2024 21:28:04 +0300 Subject: [PATCH 2/2] refactor(core)!: extracted some of the default behaviour into middlewares breaking: global `floodSleepThreshold` and `maxRetryCount` are now configured via middlewares: ```ts const tg = new TelegramClient({ network: { middlewares: networkMiddlewares.basic({ floodWaiter: { maxWait: 5000, maxRetries: 5 }, internalErrors: { maxRetries: 5 } }) } }) ``` --- packages/core/src/network/client.ts | 20 --- .../core/src/network/middlewares/bundle.ts | 1 + .../core/src/network/middlewares/default.ts | 12 ++ .../src/network/middlewares/flood-waiter.ts | 128 +++++++++++++ .../network/middlewares/internal-errors.ts | 71 ++++++++ packages/core/src/network/network-manager.ts | 169 +++++------------- packages/core/src/utils/abort-signal.ts | 10 ++ 7 files changed, 265 insertions(+), 146 deletions(-) create mode 100644 packages/core/src/network/middlewares/default.ts create mode 100644 packages/core/src/network/middlewares/flood-waiter.ts create mode 100644 packages/core/src/network/middlewares/internal-errors.ts create mode 100644 packages/core/src/utils/abort-signal.ts diff --git a/packages/core/src/network/client.ts b/packages/core/src/network/client.ts index e59c93b1..f60ff5aa 100644 --- a/packages/core/src/network/client.ts +++ b/packages/core/src/network/client.ts @@ -103,24 +103,6 @@ export interface MtClientOptions { */ reconnectionStrategy?: ReconnectionStrategy - /** - * Maximum duration of a flood_wait that will be waited automatically. - * Flood waits above this threshold will throw a FloodWaitError. - * Set to 0 to disable. Can be overridden with `throwFlood` parameter in call() params - * - * @default 10000 - */ - floodSleepThreshold?: number - - /** - * Maximum number of retries when calling RPC methods. - * Call is retried when InternalError or FloodWaitError is encountered. - * Can be set to Infinity. - * - * @default 5 - */ - maxRetryCount?: number - /** * If true, all API calls will be wrapped with `tl.invokeWithoutUpdates`, * effectively disabling the server-sent events for the clients. @@ -287,8 +269,6 @@ export class MtClient extends EventEmitter { testMode: Boolean(params.testMode), transport: params.transport, emitError: this.emitError.bind(this), - floodSleepThreshold: params.floodSleepThreshold ?? 10000, - maxRetryCount: params.maxRetryCount ?? 5, isPremium: false, useIpv6: Boolean(params.useIpv6), enableErrorReporting: params.enableErrorReporting ?? false, diff --git a/packages/core/src/network/middlewares/bundle.ts b/packages/core/src/network/middlewares/bundle.ts index 00f48163..67968f23 100644 --- a/packages/core/src/network/middlewares/bundle.ts +++ b/packages/core/src/network/middlewares/bundle.ts @@ -1,2 +1,3 @@ +export * from './default.js' export * from './on-error.js' export * from './on-method.js' diff --git a/packages/core/src/network/middlewares/default.ts b/packages/core/src/network/middlewares/default.ts new file mode 100644 index 00000000..6a904910 --- /dev/null +++ b/packages/core/src/network/middlewares/default.ts @@ -0,0 +1,12 @@ +import { RpcCallMiddleware } from '../network-manager.js' +import { floodWaiter, FloodWaiterOptions } from './flood-waiter.js' +import { internalErrorsHandler, InternalErrorsHandlerOptions } from './internal-errors.js' + +export interface BasicMiddlewaresOptions { + floodWaiter?: FloodWaiterOptions + internalErrors?: InternalErrorsHandlerOptions +} + +export const basic = (options?: BasicMiddlewaresOptions): RpcCallMiddleware[] => { + return [floodWaiter(options?.floodWaiter ?? {}), internalErrorsHandler(options?.internalErrors ?? {})] +} diff --git a/packages/core/src/network/middlewares/flood-waiter.ts b/packages/core/src/network/middlewares/flood-waiter.ts new file mode 100644 index 00000000..e4bed0da --- /dev/null +++ b/packages/core/src/network/middlewares/flood-waiter.ts @@ -0,0 +1,128 @@ +import { mtp } from '@mtcute/tl' + +import { combineAbortSignals } from '../../utils/abort-signal.js' +import { sleepWithAbort } from '../../utils/misc-utils.js' +import { isTlRpcError } from '../../utils/type-assertions.js' +import { RpcCallMiddleware } from '../network-manager.js' + +export interface FloodWaiterOptions { + /** + * Maximum number of ms to wait when a FLOOD_WAIT_X + * error is encountered. If the wait time is greater + * than this value, the request will throw an error instead + * + * This can be overwritten on a per-request basis by setting + * `floodSleepThreshold` in the request parameters + * + * @default 10_000 + */ + maxWait?: number + + /** + * Maximum number of retries to perform when a FLOOD_WAIT_X + * error is encountered. After this number of retries, the + * last error will be thrown + * + * @default 5 + */ + maxRetries?: number + + /** + * Whether to store the last flood wait time and delay + * the consecutive requests accordingly + * + * @default true + */ + store?: boolean + + /** + * If the stored wait time is less than this value, + * the request will not be delayed + * + * @default 3_000 + */ + minStoredWait?: number +} + +export function floodWaiter(options: FloodWaiterOptions): RpcCallMiddleware { + const { maxWait = 10_000, maxRetries = 5, store = true, minStoredWait = 3_000 } = options + + const storage = new Map() + + return async (ctx, next) => { + // do not send requests that are in flood wait + const method = ctx.request._ + const storedWaitUntil = store ? storage.get(method) : undefined + const floodSleepThreshold = ctx.params?.floodSleepThreshold ?? maxWait + + if (storedWaitUntil !== undefined) { + const delta = storedWaitUntil - Date.now() + + if (delta <= minStoredWait) { + // flood waits below 3 seconds are "ignored" + storage.delete(method) + } else if (delta <= floodSleepThreshold) { + await sleepWithAbort(delta, combineAbortSignals(ctx.manager.params.stopSignal, ctx.params?.abortSignal)) + storage.delete(method) + } else { + return { + _: 'mt_rpc_error', + errorCode: 420, + errorMessage: `FLOOD_WAIT_${Math.ceil(delta / 1000)}`, + } satisfies mtp.RawMt_rpc_error + } + } + + let lastError: mtp.RawMt_rpc_error | undefined + + for (let i = 0; i <= maxRetries; i++) { + const res = await next(ctx) + + if (!isTlRpcError(res)) { + return res + } + + lastError = res + + const err = res.errorMessage + + if ( + err.startsWith('FLOOD_WAIT_') || + err.startsWith('SLOWMODE_WAIT_') || + err.startsWith('FLOOD_TEST_PHONE_WAIT_') + ) { + let seconds = Number(err.slice(err.lastIndexOf('_') + 1)) + + if (Number.isNaN(seconds)) { + ctx.manager._log.warn('invalid flood wait error received: %s, ignoring', err) + + return res + } + + if (store && !err.startsWith('SLOWMODE_WAIT_')) { + // SLOW_MODE_WAIT is per-chat, not per-request + storage.set(method, Date.now() + seconds * 1000) + } + + // In test servers, FLOOD_WAIT_0 has been observed, and sleeping for + // such a short amount will cause retries very fast leading to issues + if (seconds === 0) { + seconds = 1 + } + + const ms = seconds * 1000 + + if (ms <= floodSleepThreshold) { + ctx.manager._log.warn('%s resulted in a flood wait, will retry in %d seconds', method, seconds) + await sleepWithAbort( + ms, + combineAbortSignals(ctx.manager.params.stopSignal, ctx.params?.abortSignal), + ) + continue + } + } + } + + return lastError + } +} diff --git a/packages/core/src/network/middlewares/internal-errors.ts b/packages/core/src/network/middlewares/internal-errors.ts new file mode 100644 index 00000000..3b0e31ea --- /dev/null +++ b/packages/core/src/network/middlewares/internal-errors.ts @@ -0,0 +1,71 @@ +import { tl } from '@mtcute/tl' + +import { MtTimeoutError } from '../../types/errors.js' +import { combineAbortSignals } from '../../utils/abort-signal.js' +import { sleepWithAbort } from '../../utils/misc-utils.js' +import { isTlRpcError } from '../../utils/type-assertions.js' +import { RpcCallMiddleware } from '../network-manager.js' + +const CLIENT_ERRORS = /* #__PURE__ */ new Set([ + tl.RpcError.BAD_REQUEST, + tl.RpcError.UNAUTHORIZED, + tl.RpcError.FORBIDDEN, + tl.RpcError.NOT_FOUND, + tl.RpcError.FLOOD, + tl.RpcError.SEE_OTHER, + tl.RpcError.NOT_ACCEPTABLE, +]) + +export interface InternalErrorsHandlerOptions { + /** + * Maximum number of retries to perform when an internal server error is encountered. + * + * @default Infinity + */ + maxRetries?: number + + /** + * Number of seconds to wait before retrying + * when an internal server error is encountered. + * + * @default 1 + */ + waitTime?: number +} + +export function internalErrorsHandler(params: InternalErrorsHandlerOptions): RpcCallMiddleware { + const { maxRetries = Infinity, waitTime = 1 } = params + + return async (ctx, next) => { + const numRetries = ctx.params?.maxRetryCount ?? maxRetries + + for (let i = 0; i <= numRetries; i++) { + const res = await next(ctx) + if (!isTlRpcError(res)) return res + + if (!CLIENT_ERRORS.has(res.errorCode)) { + if (ctx.params?.throw503 && res.errorCode === -503) { + throw new MtTimeoutError() + } + + const waitSeconds = res.errorMessage === 'WORKER_BUSY_TOO_LONG_RETRY' ? Math.max(1, waitTime) : waitTime + + ctx.manager._log.warn( + 'Telegram is having internal issues: %d:%s, retrying in %ds', + res.errorCode, + res.errorMessage, + waitSeconds, + ) + + if (waitSeconds > 0) { + await sleepWithAbort( + waitSeconds * 1000, + combineAbortSignals(ctx.manager.params.stopSignal, ctx.params?.abortSignal), + ) + } + } + + return res + } + } +} diff --git a/packages/core/src/network/network-manager.ts b/packages/core/src/network/network-manager.ts index 33765100..c1c2b84a 100644 --- a/packages/core/src/network/network-manager.ts +++ b/packages/core/src/network/network-manager.ts @@ -3,18 +3,12 @@ import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' import { getPlatform } from '../platform.js' import { StorageManager } from '../storage/storage.js' -import { MtArgumentError, MtcuteError, MtTimeoutError, MtUnsupportedError } from '../types/index.js' +import { MtArgumentError, MtcuteError, MtUnsupportedError } from '../types/index.js' import { ComposedMiddleware, composeMiddlewares, Middleware } from '../utils/composer.js' -import { - ControllablePromise, - createControllablePromise, - DcOptions, - ICryptoProvider, - Logger, - sleepWithAbort, -} from '../utils/index.js' +import { ControllablePromise, createControllablePromise, DcOptions, ICryptoProvider, Logger } from '../utils/index.js' import { assertTypeIs, isTlRpcError } from '../utils/type-assertions.js' import { ConfigManager } from './config-manager.js' +import { basic as defaultMiddlewares } from './middlewares/default.js' import { MultiSessionConnection } from './multi-session-connection.js' import { PersistentConnectionParams } from './persistent-connection.js' import { defaultReconnectionStrategy, ReconnectionStrategy } from './reconnection.js' @@ -24,16 +18,6 @@ import { TransportFactory } from './transports/index.js' export type ConnectionKind = 'main' | 'upload' | 'download' | 'downloadSmall' -const CLIENT_ERRORS = { - [tl.RpcError.BAD_REQUEST]: 1, - [tl.RpcError.UNAUTHORIZED]: 1, - [tl.RpcError.FORBIDDEN]: 1, - [tl.RpcError.NOT_FOUND]: 1, - [tl.RpcError.FLOOD]: 1, - [tl.RpcError.SEE_OTHER]: 1, - [tl.RpcError.NOT_ACCEPTABLE]: 1, -} - /** * Params passed into {@link NetworkManager} by {@link TelegramClient}. * This type is intended for internal usage only. @@ -48,8 +32,6 @@ export interface NetworkManagerParams { initConnectionOptions?: Partial> transport: TransportFactory reconnectionStrategy?: ReconnectionStrategy - floodSleepThreshold: number - maxRetryCount: number disableUpdates?: boolean testMode: boolean layer: number @@ -132,7 +114,7 @@ export interface RpcCallOptions { * * If set to `0`, the call will not be retried. * - * @default {@link BaseTelegramClientOptions.floodSleepThreshold} + * Only applies when the flood waiter middleware is enabled. */ floodSleepThreshold?: number @@ -140,7 +122,8 @@ export interface RpcCallOptions { * If the call results in an internal server error or a flood wait, * the maximum amount of times to retry the call. * - * @default {@link BaseTelegramClientOptions.maxRetryCount} + * Only applies when the flood waiter middleware and/or + * internal errors handler middleware is enabled. */ maxRetryCount?: number @@ -185,6 +168,9 @@ export interface RpcCallOptions { * * Useful for methods like `messages.getBotCallbackAnswer` that reliably return * -503 in case the upstream bot failed to respond. + * + * Only applies if the internal error handler middleware is enabled, + * otherwise -503 is always thrown. */ throw503?: boolean @@ -776,7 +762,11 @@ export class NetworkManager { ) => Promise private _composeCall = (middlewares?: Middleware[]) => { - if (!middlewares?.length) { + if (!middlewares) { + middlewares = defaultMiddlewares() + } + + if (!middlewares.length) { return this._call } @@ -793,7 +783,6 @@ export class NetworkManager { }) } - private _floodWaitedRequests = new Map() private _call = async ( message: T, params?: RpcCallOptions, @@ -802,29 +791,6 @@ export class NetworkManager { throw new MtcuteError('Not connected to any DC') } - const floodSleepThreshold = params?.floodSleepThreshold ?? this.params.floodSleepThreshold - const maxRetryCount = params?.maxRetryCount ?? this.params.maxRetryCount - const throw503 = params?.throw503 ?? false - - // do not send requests that are in flood wait - if (this._floodWaitedRequests.has(message._)) { - const delta = this._floodWaitedRequests.get(message._)! - Date.now() - - if (delta <= 3000) { - // flood waits below 3 seconds are "ignored" - this._floodWaitedRequests.delete(message._) - } else if (delta <= this.params.floodSleepThreshold) { - await sleepWithAbort(delta, this.params.stopSignal) - this._floodWaitedRequests.delete(message._) - } else { - const err = tl.RpcError.create(tl.RpcError.FLOOD, 'FLOOD_WAIT_%d') - err.seconds = Math.ceil(delta / 1000) - throw err - } - } - - let lastError: mtp.RawMt_rpc_error | null = null - const kind = params?.kind ?? 'main' let manager: DcConnectionManager @@ -838,102 +804,53 @@ export class NetworkManager { let multi = manager[kind] - for (let i = 0; i < maxRetryCount; i++) { - const res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId) + let res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId) - if (!isTlRpcError(res)) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return res - } + if (!isTlRpcError(res)) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return res + } - lastError = res - - const err = res.errorMessage - - if (!(res.errorCode in CLIENT_ERRORS)) { - if (throw503 && res.errorCode === -503) { - throw new MtTimeoutError() - } - - this._log.warn('Telegram is having internal issues: %d:%s, retrying', res.errorCode, err) - - if (err === 'WORKER_BUSY_TOO_LONG_RETRY') { - // according to tdlib, "it is dangerous to resend query without timeout, so use 1" - await sleepWithAbort(1000, this.params.stopSignal) - } - continue - } + const err = res.errorMessage + if (manager === this._primaryDc) { if ( - err.startsWith('FLOOD_WAIT_') || - err.startsWith('SLOWMODE_WAIT_') || - err.startsWith('FLOOD_TEST_PHONE_WAIT_') + err.startsWith('PHONE_MIGRATE_') || + err.startsWith('NETWORK_MIGRATE_') || + err.startsWith('USER_MIGRATE_') ) { - let seconds = Number(err.lastIndexOf('_') + 1) + const newDc = Number(err.slice(err.lastIndexOf('_') + 1)) - if (Number.isNaN(seconds)) { - this._log.warn('invalid flood wait error received: %s, ignoring', err) + if (Number.isNaN(newDc)) { + this._log.warn('invalid migrate error received: %s, ignoring', err) return res } - if (!err.startsWith('SLOWMODE_WAIT_')) { - // SLOW_MODE_WAIT is chat-specific, not request-specific - this._floodWaitedRequests.set(message._, Date.now() + seconds * 1000) + if (params?.localMigrate) { + manager = await this._getOtherDc(newDc) + } else { + this._log.info('received %s, migrating to dc %d', err, newDc) + + await this.changePrimaryDc(newDc) + manager = this._primaryDc! } - // In test servers, FLOOD_WAIT_0 has been observed, and sleeping for - // such a short amount will cause retries very fast leading to issues - if (seconds === 0) { - seconds = 1 - } + multi = manager[kind] - if (seconds <= floodSleepThreshold) { - this._log.warn('%s resulted in a flood wait, will retry in %d seconds', message._, seconds) - await sleepWithAbort(seconds * 1000, this.params.stopSignal) - continue - } + res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId) } + } else if (err === 'AUTH_KEY_UNREGISTERED') { + // we can try re-exporting auth from the primary connection + this._log.warn('exported auth key error, trying re-exporting..') - if (manager === this._primaryDc) { - if ( - err.startsWith('PHONE_MIGRATE_') || - err.startsWith('NETWORK_MIGRATE_') || - err.startsWith('USER_MIGRATE_') - ) { - const newDc = Number(err.slice(err.lastIndexOf('_') + 1)) + await this._exportAuthTo(manager) - if (Number.isNaN(newDc)) { - this._log.warn('invalid migrate error received: %s, ignoring', err) - - return res - } - - if (params?.localMigrate) { - manager = await this._getOtherDc(newDc) - } else { - this._log.info('received %s, migrating to dc %d', err, newDc) - - await this.changePrimaryDc(newDc) - manager = this._primaryDc! - } - - multi = manager[kind] - - continue - } - } else if (err === 'AUTH_KEY_UNREGISTERED') { - // we can try re-exporting auth from the primary connection - this._log.warn('exported auth key error, trying re-exporting..') - - await this._exportAuthTo(manager) - continue - } - - return res + res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId) } - return lastError! + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return res } changeTransport(factory: TransportFactory): void { diff --git a/packages/core/src/utils/abort-signal.ts b/packages/core/src/utils/abort-signal.ts new file mode 100644 index 00000000..876d4c0c --- /dev/null +++ b/packages/core/src/utils/abort-signal.ts @@ -0,0 +1,10 @@ +export function combineAbortSignals(signal1: AbortSignal, signal2?: AbortSignal): AbortSignal { + if (!signal2) return signal1 + + const controller = new AbortController() + + signal1.addEventListener('abort', () => controller.abort()) + signal2.addEventListener('abort', () => controller.abort()) + + return controller.signal +}