diff --git a/packages/core/src/highlevel/base.ts b/packages/core/src/highlevel/base.ts index d02e3baf..c507195a 100644 --- a/packages/core/src/highlevel/base.ts +++ b/packages/core/src/highlevel/base.ts @@ -1,15 +1,17 @@ /* eslint-disable @typescript-eslint/require-await */ -import { tl } from '@mtcute/tl' +import { mtp, tl } from '@mtcute/tl' import { MtClient, MtClientOptions } from '../network/client.js' import { ConnectionKind, RpcCallOptions } from '../network/network-manager.js' import { StorageManagerExtraOptions } from '../storage/storage.js' import { MtArgumentError } from '../types/errors.js' import { MustEqual } from '../types/utils.js' +import { reportUnknownError } from '../utils/error-reporting.js' import { asyncResettable, computeNewPasswordHash, computeSrpParams, + isTlRpcError, readStringSession, StringSessionData, writeStringSession, @@ -28,6 +30,16 @@ export interface BaseTelegramClientOptions extends MtClientOptions { updates?: UpdatesManagerParams | false } +function makeRpcError(raw: mtp.RawMt_rpc_error, stack: string, method?: string) { + const error = tl.RpcError.fromTl(raw) + error.stack = `RpcError (${error.code} ${error.text}): ${error.message}\n at ${method}\n${stack + .split('\n') + .slice(2) + .join('\n')}` + + return error +} + export class BaseTelegramClient implements ITelegramClient { readonly updates?: UpdatesManager private _serverUpdatesHandler: ServerUpdateHandler = () => {} @@ -182,6 +194,16 @@ export class BaseTelegramClient implements ITelegramClient { const res = await this.mt.call(message, params) + if (isTlRpcError(res)) { + const error = makeRpcError(res, new Error().stack ?? '', message._) + + if (error.unknown && this.params.enableErrorReporting) { + reportUnknownError(this.log, error, message._) + } + + throw error + } + await this.storage.peers.updatePeersFrom(res) // eslint-disable-next-line @typescript-eslint/no-unsafe-return diff --git a/packages/core/src/network/client.ts b/packages/core/src/network/client.ts index e4de2706..e59c93b1 100644 --- a/packages/core/src/network/client.ts +++ b/packages/core/src/network/client.ts @@ -1,7 +1,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import EventEmitter from 'events' -import { tl } from '@mtcute/tl' +import { mtp, tl } from '@mtcute/tl' import { __tlReaderMap as defaultReaderMap } from '@mtcute/tl/binary/reader.js' import { __tlWriterMap as defaultWriterMap } from '@mtcute/tl/binary/writer.js' import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' @@ -17,6 +17,7 @@ import { defaultTestDc, defaultTestIpv6Dc, ICryptoProvider, + isTlRpcError, Logger, LogManager, } from '../utils/index.js' @@ -141,20 +142,6 @@ export interface MtClientOptions { */ enableErrorReporting?: boolean - /** - * If true, RPC errors will have a stack trace of the initial `.call()` - * or `.sendForResult()` call position, which drastically improves - * debugging experience.
- * If false, they will have a stack trace of mtcute internals. - * - * Internally this creates a stack capture before every RPC call - * and stores it until the result is received. This might - * use a lot more memory than normal, thus can be disabled here. - * - * @default true - */ - niceStacks?: boolean - /** * Extra parameters for {@link NetworkManager} */ @@ -223,7 +210,6 @@ export class MtClient extends EventEmitter { */ _defaultDcs: DcOptions - private _niceStacks: boolean /** TL layer used by the client */ readonly _layer: number /** TL readers map used by the client */ @@ -231,7 +217,13 @@ export class MtClient extends EventEmitter { /** TL writers map used by the client */ readonly _writerMap: TlWriterMap - readonly _config = new ConfigManager(() => this.call({ _: 'help.getConfig' })) + readonly _config = new ConfigManager(async () => { + const res = await this.call({ _: 'help.getConfig' }) + + if (isTlRpcError(res)) throw new Error(`Failed to get config: ${res.errorMessage}`) + + return res + }) private _emitError?: (err: unknown) => void @@ -264,7 +256,6 @@ export class MtClient extends EventEmitter { } this._defaultDcs = dc - this._niceStacks = params.niceStacks ?? true this._layer = params.overrideLayer ?? tl.LAYER this._readerMap = params.readerMap ?? defaultReaderMap @@ -390,11 +381,9 @@ export class MtClient extends EventEmitter { async call( message: MustEqual, params?: RpcCallOptions, - ): Promise { - const stack = this._niceStacks ? new Error().stack : undefined - + ): Promise { // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return this.network.call(message, params, stack) + return this.network.call(message, params) } /** diff --git a/packages/core/src/network/multi-session-connection.ts b/packages/core/src/network/multi-session-connection.ts index 3ff7021a..c48c0c12 100644 --- a/packages/core/src/network/multi-session-connection.ts +++ b/packages/core/src/network/multi-session-connection.ts @@ -1,6 +1,6 @@ import EventEmitter from 'events' -import { tl } from '@mtcute/tl' +import { mtp, tl } from '@mtcute/tl' import { createControllablePromise, Logger } from '../utils/index.js' import { MtprotoSession } from './mtproto-session.js' @@ -213,11 +213,10 @@ export class MultiSessionConnection extends EventEmitter { sendRpc( request: T, - stack?: string, timeout?: number, abortSignal?: AbortSignal, chainId?: string | number, - ): Promise { + ): Promise { // if (this.params.isMainConnection) { // find the least loaded connection let min = Infinity @@ -233,7 +232,7 @@ export class MultiSessionConnection extends EventEmitter { } } - return this._connections[minIdx].sendRpc(request, stack, timeout, abortSignal, chainId) + return this._connections[minIdx].sendRpc(request, timeout, abortSignal, chainId) // } // round-robin connections diff --git a/packages/core/src/network/network-manager.ts b/packages/core/src/network/network-manager.ts index 3289df76..84412ebf 100644 --- a/packages/core/src/network/network-manager.ts +++ b/packages/core/src/network/network-manager.ts @@ -12,7 +12,7 @@ import { Logger, sleepWithAbort, } from '../utils/index.js' -import { assertTypeIs } from '../utils/type-assertions.js' +import { assertTypeIs, isTlRpcError } from '../utils/type-assertions.js' import { ConfigManager } from './config-manager.js' import { MultiSessionConnection } from './multi-session-connection.js' import { PersistentConnectionParams } from './persistent-connection.js' @@ -634,6 +634,10 @@ export class NetworkManager { dcId: manager.dcId, }) + if (isTlRpcError(auth)) { + throw new MtcuteError(`Failed to export (${auth.errorCode}: ${auth.errorMessage})`) + } + const res = await this.call( { _: 'auth.importAuthorization', @@ -643,6 +647,10 @@ export class NetworkManager { { manager }, ) + if (isTlRpcError(res)) { + throw new MtcuteError(`Failed to import (${res.errorCode}: ${res.errorMessage})`) + } + assertTypeIs('auth.importAuthorization', res, 'auth.authorization') promise.resolve() @@ -748,8 +756,7 @@ export class NetworkManager { async call( message: T, params?: RpcCallOptions, - stack?: string, - ): Promise { + ): Promise { if (!this._primaryDc) { throw new MtcuteError('Not connected to any DC') } @@ -775,7 +782,7 @@ export class NetworkManager { } } - let lastError: Error | null = null + let lastError: mtp.RawMt_rpc_error | null = null const kind = params?.kind ?? 'main' let manager: DcConnectionManager @@ -791,85 +798,99 @@ export class NetworkManager { let multi = manager[kind] for (let i = 0; i < maxRetryCount; i++) { - try { - const res = await multi.sendRpc(message, stack, params?.timeout, params?.abortSignal, params?.chainId) + const res = await multi.sendRpc(message, params?.timeout, params?.abortSignal, params?.chainId) + if (!isTlRpcError(res)) { // eslint-disable-next-line @typescript-eslint/no-unsafe-return return res - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } catch (e: any) { - lastError = e as Error + } - if (!tl.RpcError.is(e)) { - throw e + lastError = res + + const err = res.errorMessage + + if (!(res.errorCode in CLIENT_ERRORS)) { + if (throw503 && res.errorCode === -503) { + throw new MtTimeoutError() } - if (!(e.code in CLIENT_ERRORS)) { - if (throw503 && e.code === -503) { - throw new MtTimeoutError() - } + this._log.warn('Telegram is having internal issues: %d:%s, retrying', res.errorCode, err) - this._log.warn( - 'Telegram is having internal issues: %d:%s (%s), retrying', - e.code, - e.text, - e.message, - ) + if (err === 'WORKER_BUSY_TOO_LONG_RETRY') { + // according to tdlib, "it is dangerous to resend query without timeout, so use 1" + await sleepWithAbort(1000, this.params.stopSignal) + } + continue + } - if (e.text === 'WORKER_BUSY_TOO_LONG_RETRY') { - // according to tdlib, "it is dangerous to resend query without timeout, so use 1" - await sleepWithAbort(1000, this.params.stopSignal) - } + if ( + err.startsWith('FLOOD_WAIT_') || + err.startsWith('SLOWMODE_WAIT_') || + err.startsWith('FLOOD_TEST_PHONE_WAIT_') + ) { + let seconds = Number(err.lastIndexOf('_') + 1) + + if (Number.isNaN(seconds)) { + this._log.warn('invalid flood wait error received: %s, ignoring', err) + + return res + } + + if (!err.startsWith('SLOWMODE_WAIT_')) { + // SLOW_MODE_WAIT is chat-specific, not request-specific + this._floodWaitedRequests.set(message._, Date.now() + seconds * 1000) + } + + // In test servers, FLOOD_WAIT_0 has been observed, and sleeping for + // such a short amount will cause retries very fast leading to issues + if (seconds === 0) { + seconds = 1 + } + + if (seconds <= floodSleepThreshold) { + this._log.warn('%s resulted in a flood wait, will retry in %d seconds', message._, seconds) + await sleepWithAbort(seconds * 1000, this.params.stopSignal) continue } + } - if (e.is('FLOOD_WAIT_%d') || e.is('SLOWMODE_WAIT_%d') || e.is('FLOOD_TEST_PHONE_WAIT_%d')) { - if (e.text !== 'SLOWMODE_WAIT_%d') { - // SLOW_MODE_WAIT is chat-specific, not request-specific - this._floodWaitedRequests.set(message._, Date.now() + e.seconds * 1000) + if (manager === this._primaryDc) { + if ( + err.startsWith('PHONE_MIGRATE_') || + err.startsWith('NETWORK_MIGRATE_') || + err.startsWith('USER_MIGRATE_') + ) { + const newDc = Number(err.slice(err.lastIndexOf('_') + 1)) + + if (Number.isNaN(newDc)) { + this._log.warn('invalid migrate error received: %s, ignoring', err) + + return res } - // In test servers, FLOOD_WAIT_0 has been observed, and sleeping for - // such a short amount will cause retries very fast leading to issues - if (e.seconds === 0) { - e.seconds = 1 + if (params?.localMigrate) { + manager = await this._getOtherDc(newDc) + } else { + this._log.info('Migrate error, new dc = %d', newDc) + + await this.changePrimaryDc(newDc) + manager = this._primaryDc! } - if (e.seconds <= floodSleepThreshold) { - this._log.warn('%s resulted in a flood wait, will retry in %d seconds', message._, e.seconds) - await sleepWithAbort(e.seconds * 1000, this.params.stopSignal) - continue - } - } + multi = manager[kind] - if (manager === this._primaryDc) { - if (e.is('PHONE_MIGRATE_%d') || e.is('NETWORK_MIGRATE_%d') || e.is('USER_MIGRATE_%d')) { - if (params?.localMigrate) { - manager = await this._getOtherDc(e.newDc) - } else { - this._log.info('Migrate error, new dc = %d', e.newDc) - - await this.changePrimaryDc(e.newDc) - manager = this._primaryDc! - } - - multi = manager[kind] - - continue - } - } else if (e.is('AUTH_KEY_UNREGISTERED')) { - // we can try re-exporting auth from the primary connection - this._log.warn('exported auth key error, trying re-exporting..') - - await this._exportAuthTo(manager) continue } + } else if (err === 'AUTH_KEY_UNREGISTERED') { + // we can try re-exporting auth from the primary connection + this._log.warn('exported auth key error, trying re-exporting..') - throw e + await this._exportAuthTo(manager) + continue } } - throw lastError! + return lastError! } changeTransport(factory: TransportFactory): void { diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index 98548f4a..153fc64c 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -8,7 +8,6 @@ import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlSerializationCounter, Tl import { getPlatform } from '../platform.js' import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types/index.js' import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js' -import { reportUnknownError } from '../utils/error-reporting.js' import { concatBuffers, ControllablePromise, @@ -28,7 +27,6 @@ import { TransportError } from './transports/abstract.js' export interface SessionConnectionParams extends PersistentConnectionParams { initConnection: tl.RawInitConnectionRequest inactivityTimeout?: number - niceStacks?: boolean enableErrorReporting: boolean layer: number disableUpdates?: boolean @@ -61,13 +59,6 @@ const RPC_ERROR_ID = 0x2144ca19 const INVOKE_AFTER_MSG_ID = 0xcb9f372d const INVOKE_AFTER_MSG_SIZE = 12 // 8 (invokeAfterMsg) + 4 (msg_id) -function makeNiceStack(error: tl.RpcError, stack: string, method?: string) { - error.stack = `RpcError (${error.code} ${error.text}): ${error.message}\n at ${method}\n${stack - .split('\n') - .slice(2) - .join('\n')}` -} - /** * A connection to a single DC. */ @@ -867,17 +858,7 @@ export class SessionConnection extends PersistentConnection { } } - const error = tl.RpcError.fromTl(res) - - if (this.params.niceStacks !== false) { - makeNiceStack(error, rpc.stack!, rpc.method) - } - - if (error.unknown && this.params.enableErrorReporting) { - reportUnknownError(this.log, error, rpc.method) - } - - rpc.promise.reject(error) + rpc.promise.resolve(res) } else { this.log.debug('received rpc_result (%s) for request %l (%s)', result._, reqMsgId, rpc.method) @@ -1350,7 +1331,6 @@ export class SessionConnection extends PersistentConnection { sendRpc( request: T, - stack?: string, timeout?: number, abortSignal?: AbortSignal, chainId?: string | number, @@ -1359,10 +1339,6 @@ export class SessionConnection extends PersistentConnection { this._rescheduleInactivity() } - if (!stack && this.params.niceStacks !== false) { - stack = new Error().stack - } - const method = request._ let obj: tl.TlObject = request @@ -1436,7 +1412,6 @@ export class SessionConnection extends PersistentConnection { method, promise: createControllablePromise(), data: content, - stack, // we will need to know size of gzip_packed overhead in _flush() gzipOverhead: shouldGzip ? 4 + TlSerializationCounter.countBytesOverhead(content.length) : 0, initConn, @@ -1498,14 +1473,11 @@ export class SessionConnection extends PersistentConnection { } if (onTimeout) { - // todo: replace with MtTimeoutError - const error = new tl.RpcError(400, 'Client timeout') - - if (this.params.niceStacks !== false) { - makeNiceStack(error, rpc.stack!, rpc.method) - } - - rpc.promise.reject(error) + rpc.promise.resolve({ + _: 'mt_rpc_error', + errorCode: 400, + errorMessage: 'TIMEOUT', + } satisfies mtp.RawMt_rpc_error) } else if (abortSignal) { rpc.promise.reject(abortSignal.reason) } diff --git a/packages/core/src/utils/type-assertions.ts b/packages/core/src/utils/type-assertions.ts index d075ca51..c28ed805 100644 --- a/packages/core/src/utils/type-assertions.ts +++ b/packages/core/src/utils/type-assertions.ts @@ -85,3 +85,7 @@ export function assertTrue(context: string, cond: boolean): asserts cond { throw new MtTypeAssertionError(context, 'true', 'false') } } + +export function isTlRpcError(obj: unknown): obj is mtp.RawMt_rpc_error { + return typeof obj === 'object' && obj !== null && (obj as { _: string })._ === 'mt_rpc_error' +}