From 4852fe0301c59198e4777d2947b0d2b11edc1359 Mon Sep 17 00:00:00 2001 From: teidesu Date: Sun, 16 May 2021 02:52:13 +0300 Subject: [PATCH] feat(core): support rpc timeouts --- packages/core/src/base-client.ts | 3 +- .../core/src/network/telegram-connection.ts | 80 ++++++++++++++----- 2 files changed, 62 insertions(+), 21 deletions(-) diff --git a/packages/core/src/base-client.ts b/packages/core/src/base-client.ts index bc67fb3c..dd749a2b 100644 --- a/packages/core/src/base-client.ts +++ b/packages/core/src/base-client.ts @@ -458,6 +458,7 @@ export class BaseTelegramClient { params?: { throwFlood?: boolean connection?: TelegramConnection + timeout?: number } ): Promise { if (!this._connected) { @@ -494,7 +495,7 @@ export class BaseTelegramClient { for (let i = 0; i < this._rpcRetryCount; i++) { try { - const res = await connection.sendForResult(message, stack) + const res = await connection.sendForResult(message, stack, params?.timeout) await this._cachePeersFrom(res) return res diff --git a/packages/core/src/network/telegram-connection.ts b/packages/core/src/network/telegram-connection.ts index 3d6cd46a..8fd53c7e 100644 --- a/packages/core/src/network/telegram-connection.ts +++ b/packages/core/src/network/telegram-connection.ts @@ -16,9 +16,17 @@ import { import { debounce } from '../utils/function-utils' import { bufferToBigInt, ulongToLong } from '../utils/bigint-utils' import { randomBytes } from '../utils/buffer-utils' -import { BadRequestError, createRpcErrorFromTl } from '@mtcute/tl/errors' +import { BadRequestError, createRpcErrorFromTl, RpcError, RpcTimeoutError, TimeoutError } from '@mtcute/tl/errors' import { LruStringSet } from '../utils/lru-string-set' +function makeNiceStack(error: RpcError, stack: string, method?: string) { + error.stack = `${error.constructor.name} (${error.code} ${ + error.text + }): ${error.message}\n at ${ + method + }\n${stack.split('\n').slice(2).join('\n')}` +} + const _debug = require('debug') const debug = _debug('mtcute:conn') @@ -56,6 +64,8 @@ interface PendingMessage { stack?: string promise: ControllablePromise message: Buffer + // timeout after which the call will be cancelled + cancel?: NodeJS.Timeout } // TODO: error handling basically everywhere, most importantly (de-)serialization errors @@ -300,14 +310,11 @@ export class TelegramConnection extends PersistentConnection { if (message.result._ === 'mt_rpc_error') { const error = createRpcErrorFromTl(message.result) if (this.params.niceStacks !== false) { - error.stack = `${error.constructor.name} (${error.code} ${ - error.text - }): ${error.message}\n at ${ - pending.method - }\n${pending.stack!.split('\n').slice(2).join('\n')}` + makeNiceStack(error, pending.stack!, pending.method) } pending.promise.reject(error) } else { + if (pending.cancel) clearTimeout(pending.cancel) pending.promise.resolve(message.result) } @@ -326,7 +333,10 @@ export class TelegramConnection extends PersistentConnection { } if (this._pendingRpcCalls[msgId]) { - this._pendingRpcCalls[msgId].promise.resolve(message) + const pending = this._pendingRpcCalls[msgId] + if (pending.cancel) clearTimeout(pending.cancel) + + pending.promise.resolve(message) delete this._pendingRpcCalls[msgId] } else { debug('pong to unknown ping %o', message) @@ -380,12 +390,16 @@ export class TelegramConnection extends PersistentConnection { // does not seem to happen, but whatever this._seqNo -= 16 } else if (this._pendingRpcCalls[badMsgId]) { - this._pendingRpcCalls[badMsgId].promise.reject( - new BadRequestError( - 'BAD_REQUEST', - 'bad_msg_notification ' + message.errorCode - ) + const pending = this._pendingRpcCalls[badMsgId] + + const error = new BadRequestError( + 'BAD_REQUEST', + 'bad_msg_notification ' + message.errorCode ) + if (this.params.niceStacks !== false) { + makeNiceStack(error, pending.stack!, pending.method) + } + pending.promise.reject(error) delete this._pendingRpcCalls[badMsgId] return } @@ -402,7 +416,10 @@ export class TelegramConnection extends PersistentConnection { message.msgIds.forEach((idLong) => { const id = idLong.toString(16) if (this._pendingRpcCalls[id]?.method === 'auth.logOut') { - this._pendingRpcCalls[id].promise.resolve(true) + const pending = this._pendingRpcCalls[id] + if (pending.cancel) clearTimeout(pending.cancel) + + pending.promise.resolve(true) delete this._pendingRpcCalls[id] } }) @@ -495,7 +512,10 @@ export class TelegramConnection extends PersistentConnection { const reqMsgId = message.reqMsgId.toString(16) if (reqMsgId in this._pendingRpcCalls) { - this._pendingRpcCalls[reqMsgId].promise.resolve(message) + const pending = this._pendingRpcCalls[reqMsgId] + if (pending.cancel) clearTimeout(pending.cancel) + + pending.promise.resolve(message) } } @@ -540,14 +560,16 @@ export class TelegramConnection extends PersistentConnection { async _sendBufferForResult( method: string, message: Buffer, - stack?: string + stack?: string, + timeout?: number, ): Promise async _sendBufferForResult( method: string | PendingMessage, message?: Buffer, - stack?: string + stack?: string, + timeout?: number, ): Promise { - if (this.params.niceStacks !== false && !stack) { + if (typeof method === 'string' && this.params.niceStacks !== false && !stack) { stack = new Error().stack } @@ -582,9 +604,25 @@ export class TelegramConnection extends PersistentConnection { } const messageId = this._getMessageId() + const messageIdStr = messageId.toString(16) const seqNo = this._getSeqNo(true) - this._pendingRpcCalls[messageId.toString(16)] = pending + this._pendingRpcCalls[messageIdStr] = pending + + if (timeout) { + pending.cancel = setTimeout(() => { + const pending = this._pendingRpcCalls[messageIdStr] + if (pending) { + const error = new RpcTimeoutError(timeout) + if (this.params.niceStacks !== false) { + makeNiceStack(error, pending.stack!, pending.method) + } + + pending.promise.reject(error) + delete this._pendingRpcCalls[messageIdStr] + } + }, timeout) + } const encrypted = await this._mtproto.encryptMessage( content, @@ -598,7 +636,8 @@ export class TelegramConnection extends PersistentConnection { async sendForResult( message: T, - stack?: string + stack?: string, + timeout?: number, ): Promise { if (this._usable && this.params.inactivityTimeout) this._rescheduleInactivity() @@ -644,7 +683,8 @@ export class TelegramConnection extends PersistentConnection { return this._sendBufferForResult( message._, BinaryWriter.serializeObject(obj), - stack + stack, + timeout ) }