feat(core): support rpc timeouts

This commit is contained in:
teidesu 2021-05-16 02:52:13 +03:00
parent a6217c682d
commit 4852fe0301
2 changed files with 62 additions and 21 deletions

View file

@ -458,6 +458,7 @@ export class BaseTelegramClient {
params?: { params?: {
throwFlood?: boolean throwFlood?: boolean
connection?: TelegramConnection connection?: TelegramConnection
timeout?: number
} }
): Promise<tl.RpcCallReturn[T['_']]> { ): Promise<tl.RpcCallReturn[T['_']]> {
if (!this._connected) { if (!this._connected) {
@ -494,7 +495,7 @@ export class BaseTelegramClient {
for (let i = 0; i < this._rpcRetryCount; i++) { for (let i = 0; i < this._rpcRetryCount; i++) {
try { try {
const res = await connection.sendForResult(message, stack) const res = await connection.sendForResult(message, stack, params?.timeout)
await this._cachePeersFrom(res) await this._cachePeersFrom(res)
return res return res

View file

@ -16,9 +16,17 @@ import {
import { debounce } from '../utils/function-utils' import { debounce } from '../utils/function-utils'
import { bufferToBigInt, ulongToLong } from '../utils/bigint-utils' import { bufferToBigInt, ulongToLong } from '../utils/bigint-utils'
import { randomBytes } from '../utils/buffer-utils' import { randomBytes } from '../utils/buffer-utils'
import { BadRequestError, createRpcErrorFromTl } from '@mtcute/tl/errors' import { BadRequestError, createRpcErrorFromTl, RpcError, RpcTimeoutError, TimeoutError } from '@mtcute/tl/errors'
import { LruStringSet } from '../utils/lru-string-set' import { LruStringSet } from '../utils/lru-string-set'
function makeNiceStack(error: RpcError, stack: string, method?: string) {
error.stack = `${error.constructor.name} (${error.code} ${
error.text
}): ${error.message}\n at ${
method
}\n${stack.split('\n').slice(2).join('\n')}`
}
const _debug = require('debug') const _debug = require('debug')
const debug = _debug('mtcute:conn') const debug = _debug('mtcute:conn')
@ -56,6 +64,8 @@ interface PendingMessage {
stack?: string stack?: string
promise: ControllablePromise promise: ControllablePromise
message: Buffer message: Buffer
// timeout after which the call will be cancelled
cancel?: NodeJS.Timeout
} }
// TODO: error handling basically everywhere, most importantly (de-)serialization errors // TODO: error handling basically everywhere, most importantly (de-)serialization errors
@ -300,14 +310,11 @@ export class TelegramConnection extends PersistentConnection {
if (message.result._ === 'mt_rpc_error') { if (message.result._ === 'mt_rpc_error') {
const error = createRpcErrorFromTl(message.result) const error = createRpcErrorFromTl(message.result)
if (this.params.niceStacks !== false) { if (this.params.niceStacks !== false) {
error.stack = `${error.constructor.name} (${error.code} ${ makeNiceStack(error, pending.stack!, pending.method)
error.text
}): ${error.message}\n at ${
pending.method
}\n${pending.stack!.split('\n').slice(2).join('\n')}`
} }
pending.promise.reject(error) pending.promise.reject(error)
} else { } else {
if (pending.cancel) clearTimeout(pending.cancel)
pending.promise.resolve(message.result) pending.promise.resolve(message.result)
} }
@ -326,7 +333,10 @@ export class TelegramConnection extends PersistentConnection {
} }
if (this._pendingRpcCalls[msgId]) { if (this._pendingRpcCalls[msgId]) {
this._pendingRpcCalls[msgId].promise.resolve(message) const pending = this._pendingRpcCalls[msgId]
if (pending.cancel) clearTimeout(pending.cancel)
pending.promise.resolve(message)
delete this._pendingRpcCalls[msgId] delete this._pendingRpcCalls[msgId]
} else { } else {
debug('pong to unknown ping %o', message) debug('pong to unknown ping %o', message)
@ -380,12 +390,16 @@ export class TelegramConnection extends PersistentConnection {
// does not seem to happen, but whatever // does not seem to happen, but whatever
this._seqNo -= 16 this._seqNo -= 16
} else if (this._pendingRpcCalls[badMsgId]) { } else if (this._pendingRpcCalls[badMsgId]) {
this._pendingRpcCalls[badMsgId].promise.reject( const pending = this._pendingRpcCalls[badMsgId]
new BadRequestError(
'BAD_REQUEST', const error = new BadRequestError(
'bad_msg_notification ' + message.errorCode 'BAD_REQUEST',
) 'bad_msg_notification ' + message.errorCode
) )
if (this.params.niceStacks !== false) {
makeNiceStack(error, pending.stack!, pending.method)
}
pending.promise.reject(error)
delete this._pendingRpcCalls[badMsgId] delete this._pendingRpcCalls[badMsgId]
return return
} }
@ -402,7 +416,10 @@ export class TelegramConnection extends PersistentConnection {
message.msgIds.forEach((idLong) => { message.msgIds.forEach((idLong) => {
const id = idLong.toString(16) const id = idLong.toString(16)
if (this._pendingRpcCalls[id]?.method === 'auth.logOut') { if (this._pendingRpcCalls[id]?.method === 'auth.logOut') {
this._pendingRpcCalls[id].promise.resolve(true) const pending = this._pendingRpcCalls[id]
if (pending.cancel) clearTimeout(pending.cancel)
pending.promise.resolve(true)
delete this._pendingRpcCalls[id] delete this._pendingRpcCalls[id]
} }
}) })
@ -495,7 +512,10 @@ export class TelegramConnection extends PersistentConnection {
const reqMsgId = message.reqMsgId.toString(16) const reqMsgId = message.reqMsgId.toString(16)
if (reqMsgId in this._pendingRpcCalls) { if (reqMsgId in this._pendingRpcCalls) {
this._pendingRpcCalls[reqMsgId].promise.resolve(message) const pending = this._pendingRpcCalls[reqMsgId]
if (pending.cancel) clearTimeout(pending.cancel)
pending.promise.resolve(message)
} }
} }
@ -540,14 +560,16 @@ export class TelegramConnection extends PersistentConnection {
async _sendBufferForResult( async _sendBufferForResult(
method: string, method: string,
message: Buffer, message: Buffer,
stack?: string stack?: string,
timeout?: number,
): Promise<tl.TlObject> ): Promise<tl.TlObject>
async _sendBufferForResult( async _sendBufferForResult(
method: string | PendingMessage, method: string | PendingMessage,
message?: Buffer, message?: Buffer,
stack?: string stack?: string,
timeout?: number,
): Promise<tl.TlObject> { ): Promise<tl.TlObject> {
if (this.params.niceStacks !== false && !stack) { if (typeof method === 'string' && this.params.niceStacks !== false && !stack) {
stack = new Error().stack stack = new Error().stack
} }
@ -582,9 +604,25 @@ export class TelegramConnection extends PersistentConnection {
} }
const messageId = this._getMessageId() const messageId = this._getMessageId()
const messageIdStr = messageId.toString(16)
const seqNo = this._getSeqNo(true) const seqNo = this._getSeqNo(true)
this._pendingRpcCalls[messageId.toString(16)] = pending this._pendingRpcCalls[messageIdStr] = pending
if (timeout) {
pending.cancel = setTimeout(() => {
const pending = this._pendingRpcCalls[messageIdStr]
if (pending) {
const error = new RpcTimeoutError(timeout)
if (this.params.niceStacks !== false) {
makeNiceStack(error, pending.stack!, pending.method)
}
pending.promise.reject(error)
delete this._pendingRpcCalls[messageIdStr]
}
}, timeout)
}
const encrypted = await this._mtproto.encryptMessage( const encrypted = await this._mtproto.encryptMessage(
content, content,
@ -598,7 +636,8 @@ export class TelegramConnection extends PersistentConnection {
async sendForResult<T extends tl.RpcMethod>( async sendForResult<T extends tl.RpcMethod>(
message: T, message: T,
stack?: string stack?: string,
timeout?: number,
): Promise<tl.RpcCallReturn[T['_']]> { ): Promise<tl.RpcCallReturn[T['_']]> {
if (this._usable && this.params.inactivityTimeout) if (this._usable && this.params.inactivityTimeout)
this._rescheduleInactivity() this._rescheduleInactivity()
@ -644,7 +683,8 @@ export class TelegramConnection extends PersistentConnection {
return this._sendBufferForResult( return this._sendBufferForResult(
message._, message._,
BinaryWriter.serializeObject(obj), BinaryWriter.serializeObject(obj),
stack stack,
timeout
) )
} }