network middlewares #60
7 changed files with 265 additions and 146 deletions
|
@ -103,24 +103,6 @@ export interface MtClientOptions {
|
|||
*/
|
||||
reconnectionStrategy?: ReconnectionStrategy<PersistentConnectionParams>
|
||||
|
||||
/**
|
||||
* Maximum duration of a flood_wait that will be waited automatically.
|
||||
* Flood waits above this threshold will throw a FloodWaitError.
|
||||
* Set to 0 to disable. Can be overridden with `throwFlood` parameter in call() params
|
||||
*
|
||||
* @default 10000
|
||||
*/
|
||||
floodSleepThreshold?: number
|
||||
|
||||
/**
|
||||
* Maximum number of retries when calling RPC methods.
|
||||
* Call is retried when InternalError or FloodWaitError is encountered.
|
||||
* Can be set to Infinity.
|
||||
*
|
||||
* @default 5
|
||||
*/
|
||||
maxRetryCount?: number
|
||||
|
||||
/**
|
||||
* If true, all API calls will be wrapped with `tl.invokeWithoutUpdates`,
|
||||
* effectively disabling the server-sent events for the clients.
|
||||
|
@ -287,8 +269,6 @@ export class MtClient extends EventEmitter {
|
|||
testMode: Boolean(params.testMode),
|
||||
transport: params.transport,
|
||||
emitError: this.emitError.bind(this),
|
||||
floodSleepThreshold: params.floodSleepThreshold ?? 10000,
|
||||
maxRetryCount: params.maxRetryCount ?? 5,
|
||||
isPremium: false,
|
||||
useIpv6: Boolean(params.useIpv6),
|
||||
enableErrorReporting: params.enableErrorReporting ?? false,
|
||||
|
|
|
@ -1,2 +1,3 @@
|
|||
export * from './default.js'
|
||||
export * from './on-error.js'
|
||||
export * from './on-method.js'
|
||||
|
|
12
packages/core/src/network/middlewares/default.ts
Normal file
12
packages/core/src/network/middlewares/default.ts
Normal file
|
@ -0,0 +1,12 @@
|
|||
import { RpcCallMiddleware } from '../network-manager.js'
|
||||
import { floodWaiter, FloodWaiterOptions } from './flood-waiter.js'
|
||||
import { internalErrorsHandler, InternalErrorsHandlerOptions } from './internal-errors.js'
|
||||
|
||||
export interface BasicMiddlewaresOptions {
|
||||
floodWaiter?: FloodWaiterOptions
|
||||
internalErrors?: InternalErrorsHandlerOptions
|
||||
}
|
||||
|
||||
export const basic = (options?: BasicMiddlewaresOptions): RpcCallMiddleware[] => {
|
||||
return [floodWaiter(options?.floodWaiter ?? {}), internalErrorsHandler(options?.internalErrors ?? {})]
|
||||
}
|
128
packages/core/src/network/middlewares/flood-waiter.ts
Normal file
128
packages/core/src/network/middlewares/flood-waiter.ts
Normal file
|
@ -0,0 +1,128 @@
|
|||
import { mtp } from '@mtcute/tl'
|
||||
|
||||
import { combineAbortSignals } from '../../utils/abort-signal.js'
|
||||
import { sleepWithAbort } from '../../utils/misc-utils.js'
|
||||
import { isTlRpcError } from '../../utils/type-assertions.js'
|
||||
import { RpcCallMiddleware } from '../network-manager.js'
|
||||
|
||||
export interface FloodWaiterOptions {
|
||||
/**
|
||||
* Maximum number of ms to wait when a FLOOD_WAIT_X
|
||||
* error is encountered. If the wait time is greater
|
||||
* than this value, the request will throw an error instead
|
||||
*
|
||||
* This can be overwritten on a per-request basis by setting
|
||||
* `floodSleepThreshold` in the request parameters
|
||||
*
|
||||
* @default 10_000
|
||||
*/
|
||||
maxWait?: number
|
||||
|
||||
/**
|
||||
* Maximum number of retries to perform when a FLOOD_WAIT_X
|
||||
* error is encountered. After this number of retries, the
|
||||
* last error will be thrown
|
||||
*
|
||||
* @default 5
|
||||
*/
|
||||
maxRetries?: number
|
||||
|
||||
/**
|
||||
* Whether to store the last flood wait time and delay
|
||||
* the consecutive requests accordingly
|
||||
*
|
||||
* @default true
|
||||
*/
|
||||
store?: boolean
|
||||
|
||||
/**
|
||||
* If the stored wait time is less than this value,
|
||||
* the request will not be delayed
|
||||
*
|
||||
* @default 3_000
|
||||
*/
|
||||
minStoredWait?: number
|
||||
}
|
||||
|
||||
export function floodWaiter(options: FloodWaiterOptions): RpcCallMiddleware {
|
||||
const { maxWait = 10_000, maxRetries = 5, store = true, minStoredWait = 3_000 } = options
|
||||
|
||||
const storage = new Map<string, number>()
|
||||
|
||||
return async (ctx, next) => {
|
||||
// do not send requests that are in flood wait
|
||||
const method = ctx.request._
|
||||
const storedWaitUntil = store ? storage.get(method) : undefined
|
||||
const floodSleepThreshold = ctx.params?.floodSleepThreshold ?? maxWait
|
||||
|
||||
if (storedWaitUntil !== undefined) {
|
||||
const delta = storedWaitUntil - Date.now()
|
||||
|
||||
if (delta <= minStoredWait) {
|
||||
// flood waits below 3 seconds are "ignored"
|
||||
storage.delete(method)
|
||||
} else if (delta <= floodSleepThreshold) {
|
||||
await sleepWithAbort(delta, combineAbortSignals(ctx.manager.params.stopSignal, ctx.params?.abortSignal))
|
||||
storage.delete(method)
|
||||
} else {
|
||||
return {
|
||||
_: 'mt_rpc_error',
|
||||
errorCode: 420,
|
||||
errorMessage: `FLOOD_WAIT_${Math.ceil(delta / 1000)}`,
|
||||
} satisfies mtp.RawMt_rpc_error
|
||||
}
|
||||
}
|
||||
|
||||
let lastError: mtp.RawMt_rpc_error | undefined
|
||||
|
||||
for (let i = 0; i <= maxRetries; i++) {
|
||||
const res = await next(ctx)
|
||||
|
||||
if (!isTlRpcError(res)) {
|
||||
return res
|
||||
}
|
||||
|
||||
lastError = res
|
||||
|
||||
const err = res.errorMessage
|
||||
|
||||
if (
|
||||
err.startsWith('FLOOD_WAIT_') ||
|
||||
err.startsWith('SLOWMODE_WAIT_') ||
|
||||
err.startsWith('FLOOD_TEST_PHONE_WAIT_')
|
||||
) {
|
||||
let seconds = Number(err.slice(err.lastIndexOf('_') + 1))
|
||||
|
||||
if (Number.isNaN(seconds)) {
|
||||
ctx.manager._log.warn('invalid flood wait error received: %s, ignoring', err)
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
if (store && !err.startsWith('SLOWMODE_WAIT_')) {
|
||||
// SLOW_MODE_WAIT is per-chat, not per-request
|
||||
storage.set(method, Date.now() + seconds * 1000)
|
||||
}
|
||||
|
||||
// In test servers, FLOOD_WAIT_0 has been observed, and sleeping for
|
||||
// such a short amount will cause retries very fast leading to issues
|
||||
if (seconds === 0) {
|
||||
seconds = 1
|
||||
}
|
||||
|
||||
const ms = seconds * 1000
|
||||
|
||||
if (ms <= floodSleepThreshold) {
|
||||
ctx.manager._log.warn('%s resulted in a flood wait, will retry in %d seconds', method, seconds)
|
||||
await sleepWithAbort(
|
||||
ms,
|
||||
combineAbortSignals(ctx.manager.params.stopSignal, ctx.params?.abortSignal),
|
||||
)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return lastError
|
||||
}
|
||||
}
|
71
packages/core/src/network/middlewares/internal-errors.ts
Normal file
71
packages/core/src/network/middlewares/internal-errors.ts
Normal file
|
@ -0,0 +1,71 @@
|
|||
import { tl } from '@mtcute/tl'
|
||||
|
||||
import { MtTimeoutError } from '../../types/errors.js'
|
||||
import { combineAbortSignals } from '../../utils/abort-signal.js'
|
||||
import { sleepWithAbort } from '../../utils/misc-utils.js'
|
||||
import { isTlRpcError } from '../../utils/type-assertions.js'
|
||||
import { RpcCallMiddleware } from '../network-manager.js'
|
||||
|
||||
const CLIENT_ERRORS = /* #__PURE__ */ new Set<number>([
|
||||
tl.RpcError.BAD_REQUEST,
|
||||
tl.RpcError.UNAUTHORIZED,
|
||||
tl.RpcError.FORBIDDEN,
|
||||
tl.RpcError.NOT_FOUND,
|
||||
tl.RpcError.FLOOD,
|
||||
tl.RpcError.SEE_OTHER,
|
||||
tl.RpcError.NOT_ACCEPTABLE,
|
||||
])
|
||||
|
||||
export interface InternalErrorsHandlerOptions {
|
||||
/**
|
||||
* Maximum number of retries to perform when an internal server error is encountered.
|
||||
*
|
||||
* @default Infinity
|
||||
*/
|
||||
maxRetries?: number
|
||||
|
||||
/**
|
||||
* Number of seconds to wait before retrying
|
||||
* when an internal server error is encountered.
|
||||
*
|
||||
* @default 1
|
||||
*/
|
||||
waitTime?: number
|
||||
}
|
||||
|
||||
export function internalErrorsHandler(params: InternalErrorsHandlerOptions): RpcCallMiddleware {
|
||||
const { maxRetries = Infinity, waitTime = 1 } = params
|
||||
|
||||
return async (ctx, next) => {
|
||||
const numRetries = ctx.params?.maxRetryCount ?? maxRetries
|
||||
|
||||
for (let i = 0; i <= numRetries; i++) {
|
||||
const res = await next(ctx)
|
||||
if (!isTlRpcError(res)) return res
|
||||
|
||||
if (!CLIENT_ERRORS.has(res.errorCode)) {
|
||||
if (ctx.params?.throw503 && res.errorCode === -503) {
|
||||
throw new MtTimeoutError()
|
||||
}
|
||||
|
||||
const waitSeconds = res.errorMessage === 'WORKER_BUSY_TOO_LONG_RETRY' ? Math.max(1, waitTime) : waitTime
|
||||
|
||||
ctx.manager._log.warn(
|
||||
'Telegram is having internal issues: %d:%s, retrying in %ds',
|
||||
res.errorCode,
|
||||
res.errorMessage,
|
||||
waitSeconds,
|
||||
)
|
||||
|
||||
if (waitSeconds > 0) {
|
||||
await sleepWithAbort(
|
||||
waitSeconds * 1000,
|
||||
combineAbortSignals(ctx.manager.params.stopSignal, ctx.params?.abortSignal),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,18 +3,12 @@ import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
|
|||
|
||||
import { getPlatform } from '../platform.js'
|
||||
import { StorageManager } from '../storage/storage.js'
|
||||
import { MtArgumentError, MtcuteError, MtTimeoutError, MtUnsupportedError } from '../types/index.js'
|
||||
import { MtArgumentError, MtcuteError, MtUnsupportedError } from '../types/index.js'
|
||||
import { ComposedMiddleware, composeMiddlewares, Middleware } from '../utils/composer.js'
|
||||
import {
|
||||
ControllablePromise,
|
||||
createControllablePromise,
|
||||
DcOptions,
|
||||
ICryptoProvider,
|
||||
Logger,
|
||||
sleepWithAbort,
|
||||
} from '../utils/index.js'
|
||||
import { ControllablePromise, createControllablePromise, DcOptions, ICryptoProvider, Logger } from '../utils/index.js'
|
||||
import { assertTypeIs, isTlRpcError } from '../utils/type-assertions.js'
|
||||
import { ConfigManager } from './config-manager.js'
|
||||
import { basic as defaultMiddlewares } from './middlewares/default.js'
|
||||
import { MultiSessionConnection } from './multi-session-connection.js'
|
||||
import { PersistentConnectionParams } from './persistent-connection.js'
|
||||
import { defaultReconnectionStrategy, ReconnectionStrategy } from './reconnection.js'
|
||||
|
@ -24,16 +18,6 @@ import { TransportFactory } from './transports/index.js'
|
|||
|
||||
export type ConnectionKind = 'main' | 'upload' | 'download' | 'downloadSmall'
|
||||
|
||||
const CLIENT_ERRORS = {
|
||||
[tl.RpcError.BAD_REQUEST]: 1,
|
||||
[tl.RpcError.UNAUTHORIZED]: 1,
|
||||
[tl.RpcError.FORBIDDEN]: 1,
|
||||
[tl.RpcError.NOT_FOUND]: 1,
|
||||
[tl.RpcError.FLOOD]: 1,
|
||||
[tl.RpcError.SEE_OTHER]: 1,
|
||||
[tl.RpcError.NOT_ACCEPTABLE]: 1,
|
||||
}
|
||||
|
||||
/**
|
||||
* Params passed into {@link NetworkManager} by {@link TelegramClient}.
|
||||
* This type is intended for internal usage only.
|
||||
|
@ -48,8 +32,6 @@ export interface NetworkManagerParams {
|
|||
initConnectionOptions?: Partial<Omit<tl.RawInitConnectionRequest, 'apiId' | 'query'>>
|
||||
transport: TransportFactory
|
||||
reconnectionStrategy?: ReconnectionStrategy<PersistentConnectionParams>
|
||||
floodSleepThreshold: number
|
||||
maxRetryCount: number
|
||||
disableUpdates?: boolean
|
||||
testMode: boolean
|
||||
layer: number
|
||||
|
@ -132,7 +114,7 @@ export interface RpcCallOptions {
|
|||
*
|
||||
* If set to `0`, the call will not be retried.
|
||||
*
|
||||
* @default {@link BaseTelegramClientOptions.floodSleepThreshold}
|
||||
* Only applies when the flood waiter middleware is enabled.
|
||||
*/
|
||||
floodSleepThreshold?: number
|
||||
|
||||
|
@ -140,7 +122,8 @@ export interface RpcCallOptions {
|
|||
* If the call results in an internal server error or a flood wait,
|
||||
* the maximum amount of times to retry the call.
|
||||
*
|
||||
* @default {@link BaseTelegramClientOptions.maxRetryCount}
|
||||
* Only applies when the flood waiter middleware and/or
|
||||
* internal errors handler middleware is enabled.
|
||||
*/
|
||||
maxRetryCount?: number
|
||||
|
||||
|
@ -185,6 +168,9 @@ export interface RpcCallOptions {
|
|||
*
|
||||
* Useful for methods like `messages.getBotCallbackAnswer` that reliably return
|
||||
* -503 in case the upstream bot failed to respond.
|
||||
*
|
||||
* Only applies if the internal error handler middleware is enabled,
|
||||
* otherwise -503 is always thrown.
|
||||
*/
|
||||
throw503?: boolean
|
||||
|
||||
|
@ -776,7 +762,11 @@ export class NetworkManager {
|
|||
) => Promise<tl.RpcCallReturn[T['_']] | mtp.RawMt_rpc_error>
|
||||
|
||||
private _composeCall = (middlewares?: Middleware<RpcCallMiddlewareContext, unknown>[]) => {
|
||||
if (!middlewares?.length) {
|
||||
if (!middlewares) {
|
||||
middlewares = defaultMiddlewares()
|
||||
}
|
||||
|
||||
if (!middlewares.length) {
|
||||
return this._call
|
||||
}
|
||||
|
||||
|
@ -793,7 +783,6 @@ export class NetworkManager {
|
|||
})
|
||||
}
|
||||
|
||||
private _floodWaitedRequests = new Map<string, number>()
|
||||
private _call = async <T extends tl.RpcMethod>(
|
||||
message: T,
|
||||
params?: RpcCallOptions,
|
||||
|
@ -802,29 +791,6 @@ export class NetworkManager {
|
|||
throw new MtcuteError('Not connected to any DC')
|
||||
}
|
||||
|
||||
const floodSleepThreshold = params?.floodSleepThreshold ?? this.params.floodSleepThreshold
|
||||
const maxRetryCount = params?.maxRetryCount ?? this.params.maxRetryCount
|
||||
const throw503 = params?.throw503 ?? false
|
||||
|
||||
// do not send requests that are in flood wait
|
||||
if (this._floodWaitedRequests.has(message._)) {
|
||||
const delta = this._floodWaitedRequests.get(message._)! - Date.now()
|
||||
|
||||
if (delta <= 3000) {
|
||||
// flood waits below 3 seconds are "ignored"
|
||||
this._floodWaitedRequests.delete(message._)
|
||||
} else if (delta <= this.params.floodSleepThreshold) {
|
||||
await sleepWithAbort(delta, this.params.stopSignal)
|
||||
this._floodWaitedRequests.delete(message._)
|
||||
} else {
|
||||
const err = tl.RpcError.create(tl.RpcError.FLOOD, 'FLOOD_WAIT_%d')
|
||||
err.seconds = Math.ceil(delta / 1000)
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
let lastError: mtp.RawMt_rpc_error | null = null
|
||||
|
||||
const kind = params?.kind ?? 'main'
|
||||
let manager: DcConnectionManager
|
||||
|
||||
|
@ -838,102 +804,53 @@ export class NetworkManager {
|
|||
|
||||
let multi = manager[kind]
|
||||
|
||||
for (let i = 0; i < maxRetryCount; i++) {
|
||||
const res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId)
|
||||
let res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId)
|
||||
|
||||
if (!isTlRpcError(res)) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
|
||||
return res
|
||||
}
|
||||
if (!isTlRpcError(res)) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
|
||||
return res
|
||||
}
|
||||
|
||||
lastError = res
|
||||
|
||||
const err = res.errorMessage
|
||||
|
||||
if (!(res.errorCode in CLIENT_ERRORS)) {
|
||||
if (throw503 && res.errorCode === -503) {
|
||||
throw new MtTimeoutError()
|
||||
}
|
||||
|
||||
this._log.warn('Telegram is having internal issues: %d:%s, retrying', res.errorCode, err)
|
||||
|
||||
if (err === 'WORKER_BUSY_TOO_LONG_RETRY') {
|
||||
// according to tdlib, "it is dangerous to resend query without timeout, so use 1"
|
||||
await sleepWithAbort(1000, this.params.stopSignal)
|
||||
}
|
||||
continue
|
||||
}
|
||||
const err = res.errorMessage
|
||||
|
||||
if (manager === this._primaryDc) {
|
||||
if (
|
||||
err.startsWith('FLOOD_WAIT_') ||
|
||||
err.startsWith('SLOWMODE_WAIT_') ||
|
||||
err.startsWith('FLOOD_TEST_PHONE_WAIT_')
|
||||
err.startsWith('PHONE_MIGRATE_') ||
|
||||
err.startsWith('NETWORK_MIGRATE_') ||
|
||||
err.startsWith('USER_MIGRATE_')
|
||||
) {
|
||||
let seconds = Number(err.lastIndexOf('_') + 1)
|
||||
const newDc = Number(err.slice(err.lastIndexOf('_') + 1))
|
||||
|
||||
if (Number.isNaN(seconds)) {
|
||||
this._log.warn('invalid flood wait error received: %s, ignoring', err)
|
||||
if (Number.isNaN(newDc)) {
|
||||
this._log.warn('invalid migrate error received: %s, ignoring', err)
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
if (!err.startsWith('SLOWMODE_WAIT_')) {
|
||||
// SLOW_MODE_WAIT is chat-specific, not request-specific
|
||||
this._floodWaitedRequests.set(message._, Date.now() + seconds * 1000)
|
||||
if (params?.localMigrate) {
|
||||
manager = await this._getOtherDc(newDc)
|
||||
} else {
|
||||
this._log.info('received %s, migrating to dc %d', err, newDc)
|
||||
|
||||
await this.changePrimaryDc(newDc)
|
||||
manager = this._primaryDc!
|
||||
}
|
||||
|
||||
// In test servers, FLOOD_WAIT_0 has been observed, and sleeping for
|
||||
// such a short amount will cause retries very fast leading to issues
|
||||
if (seconds === 0) {
|
||||
seconds = 1
|
||||
}
|
||||
multi = manager[kind]
|
||||
|
||||
if (seconds <= floodSleepThreshold) {
|
||||
this._log.warn('%s resulted in a flood wait, will retry in %d seconds', message._, seconds)
|
||||
await sleepWithAbort(seconds * 1000, this.params.stopSignal)
|
||||
continue
|
||||
}
|
||||
res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId)
|
||||
}
|
||||
} else if (err === 'AUTH_KEY_UNREGISTERED') {
|
||||
// we can try re-exporting auth from the primary connection
|
||||
this._log.warn('exported auth key error, trying re-exporting..')
|
||||
|
||||
if (manager === this._primaryDc) {
|
||||
if (
|
||||
err.startsWith('PHONE_MIGRATE_') ||
|
||||
err.startsWith('NETWORK_MIGRATE_') ||
|
||||
err.startsWith('USER_MIGRATE_')
|
||||
) {
|
||||
const newDc = Number(err.slice(err.lastIndexOf('_') + 1))
|
||||
await this._exportAuthTo(manager)
|
||||
|
||||
if (Number.isNaN(newDc)) {
|
||||
this._log.warn('invalid migrate error received: %s, ignoring', err)
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
if (params?.localMigrate) {
|
||||
manager = await this._getOtherDc(newDc)
|
||||
} else {
|
||||
this._log.info('received %s, migrating to dc %d', err, newDc)
|
||||
|
||||
await this.changePrimaryDc(newDc)
|
||||
manager = this._primaryDc!
|
||||
}
|
||||
|
||||
multi = manager[kind]
|
||||
|
||||
continue
|
||||
}
|
||||
} else if (err === 'AUTH_KEY_UNREGISTERED') {
|
||||
// we can try re-exporting auth from the primary connection
|
||||
this._log.warn('exported auth key error, trying re-exporting..')
|
||||
|
||||
await this._exportAuthTo(manager)
|
||||
continue
|
||||
}
|
||||
|
||||
return res
|
||||
res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId)
|
||||
}
|
||||
|
||||
return lastError!
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
|
||||
return res
|
||||
}
|
||||
|
||||
changeTransport(factory: TransportFactory): void {
|
||||
|
|
10
packages/core/src/utils/abort-signal.ts
Normal file
10
packages/core/src/utils/abort-signal.ts
Normal file
|
@ -0,0 +1,10 @@
|
|||
export function combineAbortSignals(signal1: AbortSignal, signal2?: AbortSignal): AbortSignal {
|
||||
if (!signal2) return signal1
|
||||
|
||||
const controller = new AbortController()
|
||||
|
||||
signal1.addEventListener('abort', () => controller.abort())
|
||||
signal2.addEventListener('abort', () => controller.abort())
|
||||
|
||||
return controller.signal
|
||||
}
|
Loading…
Reference in a new issue