refactor(core): monadic internal handling of rpc errors

This commit is contained in:
alina 🌸 2024-07-03 18:26:24 +03:00
parent e432fdb5b3
commit c667399b4d
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
6 changed files with 129 additions and 122 deletions

View file

@ -1,15 +1,17 @@
/* eslint-disable @typescript-eslint/require-await */ /* eslint-disable @typescript-eslint/require-await */
import { tl } from '@mtcute/tl' import { mtp, tl } from '@mtcute/tl'
import { MtClient, MtClientOptions } from '../network/client.js' import { MtClient, MtClientOptions } from '../network/client.js'
import { ConnectionKind, RpcCallOptions } from '../network/network-manager.js' import { ConnectionKind, RpcCallOptions } from '../network/network-manager.js'
import { StorageManagerExtraOptions } from '../storage/storage.js' import { StorageManagerExtraOptions } from '../storage/storage.js'
import { MtArgumentError } from '../types/errors.js' import { MtArgumentError } from '../types/errors.js'
import { MustEqual } from '../types/utils.js' import { MustEqual } from '../types/utils.js'
import { reportUnknownError } from '../utils/error-reporting.js'
import { import {
asyncResettable, asyncResettable,
computeNewPasswordHash, computeNewPasswordHash,
computeSrpParams, computeSrpParams,
isTlRpcError,
readStringSession, readStringSession,
StringSessionData, StringSessionData,
writeStringSession, writeStringSession,
@ -28,6 +30,16 @@ export interface BaseTelegramClientOptions extends MtClientOptions {
updates?: UpdatesManagerParams | false updates?: UpdatesManagerParams | false
} }
function makeRpcError(raw: mtp.RawMt_rpc_error, stack: string, method?: string) {
const error = tl.RpcError.fromTl(raw)
error.stack = `RpcError (${error.code} ${error.text}): ${error.message}\n at ${method}\n${stack
.split('\n')
.slice(2)
.join('\n')}`
return error
}
export class BaseTelegramClient implements ITelegramClient { export class BaseTelegramClient implements ITelegramClient {
readonly updates?: UpdatesManager readonly updates?: UpdatesManager
private _serverUpdatesHandler: ServerUpdateHandler = () => {} private _serverUpdatesHandler: ServerUpdateHandler = () => {}
@ -182,6 +194,16 @@ export class BaseTelegramClient implements ITelegramClient {
const res = await this.mt.call(message, params) const res = await this.mt.call(message, params)
if (isTlRpcError(res)) {
const error = makeRpcError(res, new Error().stack ?? '', message._)
if (error.unknown && this.params.enableErrorReporting) {
reportUnknownError(this.log, error, message._)
}
throw error
}
await this.storage.peers.updatePeersFrom(res) await this.storage.peers.updatePeersFrom(res)
// eslint-disable-next-line @typescript-eslint/no-unsafe-return // eslint-disable-next-line @typescript-eslint/no-unsafe-return

View file

@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-explicit-any */
import EventEmitter from 'events' import EventEmitter from 'events'
import { tl } from '@mtcute/tl' import { mtp, tl } from '@mtcute/tl'
import { __tlReaderMap as defaultReaderMap } from '@mtcute/tl/binary/reader.js' import { __tlReaderMap as defaultReaderMap } from '@mtcute/tl/binary/reader.js'
import { __tlWriterMap as defaultWriterMap } from '@mtcute/tl/binary/writer.js' import { __tlWriterMap as defaultWriterMap } from '@mtcute/tl/binary/writer.js'
import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
@ -17,6 +17,7 @@ import {
defaultTestDc, defaultTestDc,
defaultTestIpv6Dc, defaultTestIpv6Dc,
ICryptoProvider, ICryptoProvider,
isTlRpcError,
Logger, Logger,
LogManager, LogManager,
} from '../utils/index.js' } from '../utils/index.js'
@ -141,20 +142,6 @@ export interface MtClientOptions {
*/ */
enableErrorReporting?: boolean enableErrorReporting?: boolean
/**
* If true, RPC errors will have a stack trace of the initial `.call()`
* or `.sendForResult()` call position, which drastically improves
* debugging experience.<br>
* If false, they will have a stack trace of mtcute internals.
*
* Internally this creates a stack capture before every RPC call
* and stores it until the result is received. This might
* use a lot more memory than normal, thus can be disabled here.
*
* @default true
*/
niceStacks?: boolean
/** /**
* Extra parameters for {@link NetworkManager} * Extra parameters for {@link NetworkManager}
*/ */
@ -223,7 +210,6 @@ export class MtClient extends EventEmitter {
*/ */
_defaultDcs: DcOptions _defaultDcs: DcOptions
private _niceStacks: boolean
/** TL layer used by the client */ /** TL layer used by the client */
readonly _layer: number readonly _layer: number
/** TL readers map used by the client */ /** TL readers map used by the client */
@ -231,7 +217,13 @@ export class MtClient extends EventEmitter {
/** TL writers map used by the client */ /** TL writers map used by the client */
readonly _writerMap: TlWriterMap readonly _writerMap: TlWriterMap
readonly _config = new ConfigManager(() => this.call({ _: 'help.getConfig' })) readonly _config = new ConfigManager(async () => {
const res = await this.call({ _: 'help.getConfig' })
if (isTlRpcError(res)) throw new Error(`Failed to get config: ${res.errorMessage}`)
return res
})
private _emitError?: (err: unknown) => void private _emitError?: (err: unknown) => void
@ -264,7 +256,6 @@ export class MtClient extends EventEmitter {
} }
this._defaultDcs = dc this._defaultDcs = dc
this._niceStacks = params.niceStacks ?? true
this._layer = params.overrideLayer ?? tl.LAYER this._layer = params.overrideLayer ?? tl.LAYER
this._readerMap = params.readerMap ?? defaultReaderMap this._readerMap = params.readerMap ?? defaultReaderMap
@ -390,11 +381,9 @@ export class MtClient extends EventEmitter {
async call<T extends tl.RpcMethod>( async call<T extends tl.RpcMethod>(
message: MustEqual<T, tl.RpcMethod>, message: MustEqual<T, tl.RpcMethod>,
params?: RpcCallOptions, params?: RpcCallOptions,
): Promise<tl.RpcCallReturn[T['_']]> { ): Promise<tl.RpcCallReturn[T['_']] | mtp.RawMt_rpc_error> {
const stack = this._niceStacks ? new Error().stack : undefined
// eslint-disable-next-line @typescript-eslint/no-unsafe-return // eslint-disable-next-line @typescript-eslint/no-unsafe-return
return this.network.call(message, params, stack) return this.network.call(message, params)
} }
/** /**

View file

@ -1,6 +1,6 @@
import EventEmitter from 'events' import EventEmitter from 'events'
import { tl } from '@mtcute/tl' import { mtp, tl } from '@mtcute/tl'
import { createControllablePromise, Logger } from '../utils/index.js' import { createControllablePromise, Logger } from '../utils/index.js'
import { MtprotoSession } from './mtproto-session.js' import { MtprotoSession } from './mtproto-session.js'
@ -213,11 +213,10 @@ export class MultiSessionConnection extends EventEmitter {
sendRpc<T extends tl.RpcMethod>( sendRpc<T extends tl.RpcMethod>(
request: T, request: T,
stack?: string,
timeout?: number, timeout?: number,
abortSignal?: AbortSignal, abortSignal?: AbortSignal,
chainId?: string | number, chainId?: string | number,
): Promise<tl.RpcCallReturn[T['_']]> { ): Promise<tl.RpcCallReturn[T['_']] | mtp.RawMt_rpc_error> {
// if (this.params.isMainConnection) { // if (this.params.isMainConnection) {
// find the least loaded connection // find the least loaded connection
let min = Infinity let min = Infinity
@ -233,7 +232,7 @@ export class MultiSessionConnection extends EventEmitter {
} }
} }
return this._connections[minIdx].sendRpc(request, stack, timeout, abortSignal, chainId) return this._connections[minIdx].sendRpc(request, timeout, abortSignal, chainId)
// } // }
// round-robin connections // round-robin connections

View file

@ -12,7 +12,7 @@ import {
Logger, Logger,
sleepWithAbort, sleepWithAbort,
} from '../utils/index.js' } from '../utils/index.js'
import { assertTypeIs } from '../utils/type-assertions.js' import { assertTypeIs, isTlRpcError } from '../utils/type-assertions.js'
import { ConfigManager } from './config-manager.js' import { ConfigManager } from './config-manager.js'
import { MultiSessionConnection } from './multi-session-connection.js' import { MultiSessionConnection } from './multi-session-connection.js'
import { PersistentConnectionParams } from './persistent-connection.js' import { PersistentConnectionParams } from './persistent-connection.js'
@ -634,6 +634,10 @@ export class NetworkManager {
dcId: manager.dcId, dcId: manager.dcId,
}) })
if (isTlRpcError(auth)) {
throw new MtcuteError(`Failed to export (${auth.errorCode}: ${auth.errorMessage})`)
}
const res = await this.call( const res = await this.call(
{ {
_: 'auth.importAuthorization', _: 'auth.importAuthorization',
@ -643,6 +647,10 @@ export class NetworkManager {
{ manager }, { manager },
) )
if (isTlRpcError(res)) {
throw new MtcuteError(`Failed to import (${res.errorCode}: ${res.errorMessage})`)
}
assertTypeIs('auth.importAuthorization', res, 'auth.authorization') assertTypeIs('auth.importAuthorization', res, 'auth.authorization')
promise.resolve() promise.resolve()
@ -748,8 +756,7 @@ export class NetworkManager {
async call<T extends tl.RpcMethod>( async call<T extends tl.RpcMethod>(
message: T, message: T,
params?: RpcCallOptions, params?: RpcCallOptions,
stack?: string, ): Promise<tl.RpcCallReturn[T['_']] | mtp.RawMt_rpc_error> {
): Promise<tl.RpcCallReturn[T['_']]> {
if (!this._primaryDc) { if (!this._primaryDc) {
throw new MtcuteError('Not connected to any DC') throw new MtcuteError('Not connected to any DC')
} }
@ -775,7 +782,7 @@ export class NetworkManager {
} }
} }
let lastError: Error | null = null let lastError: mtp.RawMt_rpc_error | null = null
const kind = params?.kind ?? 'main' const kind = params?.kind ?? 'main'
let manager: DcConnectionManager let manager: DcConnectionManager
@ -791,85 +798,99 @@ export class NetworkManager {
let multi = manager[kind] let multi = manager[kind]
for (let i = 0; i < maxRetryCount; i++) { for (let i = 0; i < maxRetryCount; i++) {
try { const res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId)
const res = await multi.sendRpc(message, stack, params?.timeout, params?.abortSignal, params?.chainId)
if (!isTlRpcError(res)) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return // eslint-disable-next-line @typescript-eslint/no-unsafe-return
return res return res
// eslint-disable-next-line @typescript-eslint/no-explicit-any }
} catch (e: any) {
lastError = e as Error
if (!tl.RpcError.is(e)) { lastError = res
throw e
const err = res.errorMessage
if (!(res.errorCode in CLIENT_ERRORS)) {
if (throw503 && res.errorCode === -503) {
throw new MtTimeoutError()
} }
if (!(e.code in CLIENT_ERRORS)) { this._log.warn('Telegram is having internal issues: %d:%s, retrying', res.errorCode, err)
if (throw503 && e.code === -503) {
throw new MtTimeoutError()
}
this._log.warn( if (err === 'WORKER_BUSY_TOO_LONG_RETRY') {
'Telegram is having internal issues: %d:%s (%s), retrying', // according to tdlib, "it is dangerous to resend query without timeout, so use 1"
e.code, await sleepWithAbort(1000, this.params.stopSignal)
e.text, }
e.message, continue
) }
if (e.text === 'WORKER_BUSY_TOO_LONG_RETRY') { if (
// according to tdlib, "it is dangerous to resend query without timeout, so use 1" err.startsWith('FLOOD_WAIT_') ||
await sleepWithAbort(1000, this.params.stopSignal) err.startsWith('SLOWMODE_WAIT_') ||
} err.startsWith('FLOOD_TEST_PHONE_WAIT_')
) {
let seconds = Number(err.lastIndexOf('_') + 1)
if (Number.isNaN(seconds)) {
this._log.warn('invalid flood wait error received: %s, ignoring', err)
return res
}
if (!err.startsWith('SLOWMODE_WAIT_')) {
// SLOW_MODE_WAIT is chat-specific, not request-specific
this._floodWaitedRequests.set(message._, Date.now() + seconds * 1000)
}
// In test servers, FLOOD_WAIT_0 has been observed, and sleeping for
// such a short amount will cause retries very fast leading to issues
if (seconds === 0) {
seconds = 1
}
if (seconds <= floodSleepThreshold) {
this._log.warn('%s resulted in a flood wait, will retry in %d seconds', message._, seconds)
await sleepWithAbort(seconds * 1000, this.params.stopSignal)
continue continue
} }
}
if (e.is('FLOOD_WAIT_%d') || e.is('SLOWMODE_WAIT_%d') || e.is('FLOOD_TEST_PHONE_WAIT_%d')) { if (manager === this._primaryDc) {
if (e.text !== 'SLOWMODE_WAIT_%d') { if (
// SLOW_MODE_WAIT is chat-specific, not request-specific err.startsWith('PHONE_MIGRATE_') ||
this._floodWaitedRequests.set(message._, Date.now() + e.seconds * 1000) err.startsWith('NETWORK_MIGRATE_') ||
err.startsWith('USER_MIGRATE_')
) {
const newDc = Number(err.slice(err.lastIndexOf('_') + 1))
if (Number.isNaN(newDc)) {
this._log.warn('invalid migrate error received: %s, ignoring', err)
return res
} }
// In test servers, FLOOD_WAIT_0 has been observed, and sleeping for if (params?.localMigrate) {
// such a short amount will cause retries very fast leading to issues manager = await this._getOtherDc(newDc)
if (e.seconds === 0) { } else {
e.seconds = 1 this._log.info('Migrate error, new dc = %d', newDc)
await this.changePrimaryDc(newDc)
manager = this._primaryDc!
} }
if (e.seconds <= floodSleepThreshold) { multi = manager[kind]
this._log.warn('%s resulted in a flood wait, will retry in %d seconds', message._, e.seconds)
await sleepWithAbort(e.seconds * 1000, this.params.stopSignal)
continue
}
}
if (manager === this._primaryDc) {
if (e.is('PHONE_MIGRATE_%d') || e.is('NETWORK_MIGRATE_%d') || e.is('USER_MIGRATE_%d')) {
if (params?.localMigrate) {
manager = await this._getOtherDc(e.newDc)
} else {
this._log.info('Migrate error, new dc = %d', e.newDc)
await this.changePrimaryDc(e.newDc)
manager = this._primaryDc!
}
multi = manager[kind]
continue
}
} else if (e.is('AUTH_KEY_UNREGISTERED')) {
// we can try re-exporting auth from the primary connection
this._log.warn('exported auth key error, trying re-exporting..')
await this._exportAuthTo(manager)
continue continue
} }
} else if (err === 'AUTH_KEY_UNREGISTERED') {
// we can try re-exporting auth from the primary connection
this._log.warn('exported auth key error, trying re-exporting..')
throw e await this._exportAuthTo(manager)
continue
} }
} }
throw lastError! return lastError!
} }
changeTransport(factory: TransportFactory): void { changeTransport(factory: TransportFactory): void {

View file

@ -8,7 +8,6 @@ import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlSerializationCounter, Tl
import { getPlatform } from '../platform.js' import { getPlatform } from '../platform.js'
import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types/index.js' import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types/index.js'
import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js' import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js'
import { reportUnknownError } from '../utils/error-reporting.js'
import { import {
concatBuffers, concatBuffers,
ControllablePromise, ControllablePromise,
@ -28,7 +27,6 @@ import { TransportError } from './transports/abstract.js'
export interface SessionConnectionParams extends PersistentConnectionParams { export interface SessionConnectionParams extends PersistentConnectionParams {
initConnection: tl.RawInitConnectionRequest initConnection: tl.RawInitConnectionRequest
inactivityTimeout?: number inactivityTimeout?: number
niceStacks?: boolean
enableErrorReporting: boolean enableErrorReporting: boolean
layer: number layer: number
disableUpdates?: boolean disableUpdates?: boolean
@ -61,13 +59,6 @@ const RPC_ERROR_ID = 0x2144ca19
const INVOKE_AFTER_MSG_ID = 0xcb9f372d const INVOKE_AFTER_MSG_ID = 0xcb9f372d
const INVOKE_AFTER_MSG_SIZE = 12 // 8 (invokeAfterMsg) + 4 (msg_id) const INVOKE_AFTER_MSG_SIZE = 12 // 8 (invokeAfterMsg) + 4 (msg_id)
function makeNiceStack(error: tl.RpcError, stack: string, method?: string) {
error.stack = `RpcError (${error.code} ${error.text}): ${error.message}\n at ${method}\n${stack
.split('\n')
.slice(2)
.join('\n')}`
}
/** /**
* A connection to a single DC. * A connection to a single DC.
*/ */
@ -867,17 +858,7 @@ export class SessionConnection extends PersistentConnection {
} }
} }
const error = tl.RpcError.fromTl(res) rpc.promise.resolve(res)
if (this.params.niceStacks !== false) {
makeNiceStack(error, rpc.stack!, rpc.method)
}
if (error.unknown && this.params.enableErrorReporting) {
reportUnknownError(this.log, error, rpc.method)
}
rpc.promise.reject(error)
} else { } else {
this.log.debug('received rpc_result (%s) for request %l (%s)', result._, reqMsgId, rpc.method) this.log.debug('received rpc_result (%s) for request %l (%s)', result._, reqMsgId, rpc.method)
@ -1350,7 +1331,6 @@ export class SessionConnection extends PersistentConnection {
sendRpc<T extends tl.RpcMethod>( sendRpc<T extends tl.RpcMethod>(
request: T, request: T,
stack?: string,
timeout?: number, timeout?: number,
abortSignal?: AbortSignal, abortSignal?: AbortSignal,
chainId?: string | number, chainId?: string | number,
@ -1359,10 +1339,6 @@ export class SessionConnection extends PersistentConnection {
this._rescheduleInactivity() this._rescheduleInactivity()
} }
if (!stack && this.params.niceStacks !== false) {
stack = new Error().stack
}
const method = request._ const method = request._
let obj: tl.TlObject = request let obj: tl.TlObject = request
@ -1436,7 +1412,6 @@ export class SessionConnection extends PersistentConnection {
method, method,
promise: createControllablePromise(), promise: createControllablePromise(),
data: content, data: content,
stack,
// we will need to know size of gzip_packed overhead in _flush() // we will need to know size of gzip_packed overhead in _flush()
gzipOverhead: shouldGzip ? 4 + TlSerializationCounter.countBytesOverhead(content.length) : 0, gzipOverhead: shouldGzip ? 4 + TlSerializationCounter.countBytesOverhead(content.length) : 0,
initConn, initConn,
@ -1498,14 +1473,11 @@ export class SessionConnection extends PersistentConnection {
} }
if (onTimeout) { if (onTimeout) {
// todo: replace with MtTimeoutError rpc.promise.resolve({
const error = new tl.RpcError(400, 'Client timeout') _: 'mt_rpc_error',
errorCode: 400,
if (this.params.niceStacks !== false) { errorMessage: 'TIMEOUT',
makeNiceStack(error, rpc.stack!, rpc.method) } satisfies mtp.RawMt_rpc_error)
}
rpc.promise.reject(error)
} else if (abortSignal) { } else if (abortSignal) {
rpc.promise.reject(abortSignal.reason) rpc.promise.reject(abortSignal.reason)
} }

View file

@ -85,3 +85,7 @@ export function assertTrue(context: string, cond: boolean): asserts cond {
throw new MtTypeAssertionError(context, 'true', 'false') throw new MtTypeAssertionError(context, 'true', 'false')
} }
} }
export function isTlRpcError(obj: unknown): obj is mtp.RawMt_rpc_error {
return typeof obj === 'object' && obj !== null && (obj as { _: string })._ === 'mt_rpc_error'
}