/* eslint-disable @typescript-eslint/no-explicit-any */ // will be reworked in MTQ-32 import Long from 'long' import { mtp, tl } from '@mtcute/tl' import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlSerializationCounter, TlWriterMap, } from '@mtcute/tl-runtime' import { gzipDeflate, gzipInflate } from '@mtcute/tl-runtime/src/platform/gzip' import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types' import { ControllablePromise, createCancellablePromise, createControllablePromise, EarlyTimer, longFromBuffer, randomBytes, randomLong, removeFromLongArray, } from '../utils' import { createAesIgeForMessageOld } from '../utils/crypto/mtproto' import { doAuthorization } from './authorization' import { MtprotoSession, PendingMessage, PendingRpc } from './mtproto-session' import { PersistentConnection, PersistentConnectionParams, } from './persistent-connection' import { TransportError } from './transports' const TEMP_AUTH_KEY_EXPIRY = 86400 export interface SessionConnectionParams extends PersistentConnectionParams { initConnection: tl.RawInitConnectionRequest inactivityTimeout?: number niceStacks?: boolean layer: number disableUpdates?: boolean withUpdates?: boolean isMainConnection: boolean isMainDcConnection: boolean usePfs?: boolean readerMap: TlReaderMap writerMap: TlWriterMap } // destroy_auth_key#d1435160 = DestroyAuthKeyRes; // const DESTROY_AUTH_KEY = Buffer.from('605134d1', 'hex') function makeNiceStack(error: tl.RpcError, stack: string, method?: string) { error.stack = `RpcError (${error.code} ${error.text}): ${ error.message }\n at ${method}\n${stack.split('\n').slice(2).join('\n')}` } /** * A connection to a single DC. */ export class SessionConnection extends PersistentConnection { readonly params!: SessionConnectionParams private _flushTimer = new EarlyTimer() private _queuedDestroySession: Long[] = [] // waitForMessage private _pendingWaitForUnencrypted: [ ControllablePromise, NodeJS.Timeout ][] = [] private _usePfs = this.params.usePfs ?? false private _isPfsBindingPending = false private _isPfsBindingPendingInBackground = false private _pfsUpdateTimeout?: NodeJS.Timeout private _inactivityPendingFlush = false private _readerMap: TlReaderMap private _writerMap: TlWriterMap constructor( params: SessionConnectionParams, readonly _session: MtprotoSession, ) { super(params, _session.log.create('conn')) this._flushTimer.onTimeout(this._flush.bind(this)) this._readerMap = params.readerMap this._writerMap = params.writerMap this._handleRawMessage = this._handleRawMessage.bind(this) } getAuthKey(temp = false): Buffer | null { const key = temp ? this._session._authKeyTemp : this._session._authKey if (!key.ready) return null return key.key } setUsePfs(usePfs: boolean): void { if (this._usePfs === usePfs) return this.log.debug('use pfs changed to %s', usePfs) this._usePfs = usePfs if (!usePfs) { this._isPfsBindingPending = false this._isPfsBindingPendingInBackground = false this._session._authKeyTemp.reset() clearTimeout(this._pfsUpdateTimeout) } this._resetSession() } onTransportClose(): void { super.onTransportClose() Object.values(this._pendingWaitForUnencrypted).forEach( ([prom, timeout]) => { prom.reject(new MtcuteError('Connection closed')) clearTimeout(timeout) }, ) this.emit('disconnect') this.reset() } destroy(): void { super.destroy() this.reset(true) } reset(forever = false): void { this._session.initConnectionCalled = false this._flushTimer.reset() if (forever) { this.removeAllListeners() } } onConnected(): void { // check if we have all the needed keys if (!this._session._authKey.ready) { if (!this.params.isMainConnection) { this.log.info('no auth key, waiting for main connection') // once it is done, we will be notified return } this.log.info('no perm auth key, authorizing...') this._authorize() // if we use pfs, we *could* also start temp key exchange here // but telegram restricts us to only have one auth session per connection, // and having a separate connection for pfs is not worth it return } if (this._usePfs && !this._session._authKeyTemp.ready) { this.log.info('no temp auth key but using pfs, authorizing') this._authorizePfs() return } this.log.info('auth keys are already available') this.onConnectionUsable() } protected onError(error: Error): void { // https://core.telegram.org/mtproto/mtproto-_transports#_transport-errors if (error instanceof TransportError) { if (error.code === 404) { // if we are using pfs, this could be due to the server // forgetting our temp key (which is kinda weird but expected) if (this._usePfs) { if ( !this._isPfsBindingPending && this._session._authKeyTemp.ready ) { this.log.info('transport error 404, reauthorizing pfs') // this is important! we must reset temp auth key before // we proceed with new temp key derivation. // otherwise, we can end up in an infinite loop in case it // was actually perm_key that got 404-ed // // if temp key binding is already in process in background, // _authorizePfs will mark it as foreground to prevent new // queries from being sent (to avoid even more 404s) this._session._authKeyTemp.reset() this._authorizePfs() this._onAllFailed('temp key expired, binding started') return } else if (this._isPfsBindingPending) { this.log.info( 'transport error 404, pfs binding in progress', ) this._onAllFailed('temp key expired, binding pending') return } // otherwise, 404 must be referencing the perm_key this.log.info('transport error 404, reauthorizing') } // there happened a little trolling this._session.reset(true) this.emit('key-change', null) this._authorize() return } this.log.error('transport error %d', error.code) // all pending queries must be resent this._onAllFailed(`transport error ${error.code}`) if (error.code === 429) { this._session.onTransportFlood( this.emit.bind(this, 'flood-done'), ) return } } this.emit('error', error) } protected onConnectionUsable() { super.onConnectionUsable() if (this.params.withUpdates) { // we must send some user-related rpc to the server to make sure that // it will send us updates this.sendRpc({ _: 'updates.getState' }).catch((err: any) => { this.log.warn( 'failed to send updates.getState: %s', err.text || err.message, ) }) } // just in case this._flushTimer.emitBeforeNext(1000) } _authorize(): void { if (this._session.authorizationPending) { this.log.info('_authorize(): authorization already in progress') return } if (!this.params.isMainConnection) { // we don't authorize on non-main connections this.log.debug('_authorize(): non-main connection, requesting...') this.emit('request-auth') return } this._session.authorizationPending = true this.emit('auth-begin') doAuthorization(this, this.params.crypto) .then(async ([authKey, serverSalt, timeOffset]) => { await this._session._authKey.setup(authKey) this._session.serverSalt = serverSalt this._session._timeOffset = timeOffset this._session.authorizationPending = false this.emit('key-change', authKey) if (this._usePfs) { return this._authorizePfs() } this.onConnectionUsable() }) .catch((err: Error) => { this._session.authorizationPending = false this.log.error('Authorization error: %s', err.message) this.onError(err) this.reconnect() }) } private _authorizePfs(background = false): void { if (this._isPfsBindingPending) return if (this._pfsUpdateTimeout) { clearTimeout(this._pfsUpdateTimeout) this._pfsUpdateTimeout = undefined } if (this._isPfsBindingPendingInBackground) { // e.g. temp key has expired while we were binding a key in the background // in this case, we shouldn't start pfs binding again, and instead wait for // current operation to complete this._isPfsBindingPendingInBackground = false this._isPfsBindingPending = true return } if (background) { this._isPfsBindingPendingInBackground = true } else { this._isPfsBindingPending = true } doAuthorization(this, this.params.crypto, TEMP_AUTH_KEY_EXPIRY) .then(async ([tempAuthKey, tempServerSalt]) => { if (!this._usePfs) { this.log.info( 'pfs has been disabled while generating temp key', ) return } const tempKey = this._session._authKeyTempSecondary await tempKey.setup(tempAuthKey) const msgId = this._session.getMessageId() this.log.debug( 'binding temp_auth_key (%h) to perm_auth_key (%h), msg_id = %l...', tempKey.id, this._session._authKey.id, msgId, ) // we now need to bind the key const inner: mtp.RawMt_bind_auth_key_inner = { _: 'mt_bind_auth_key_inner', nonce: randomLong(), tempAuthKeyId: longFromBuffer(tempKey.id), permAuthKeyId: longFromBuffer(this._session._authKey.id), tempSessionId: this._session._sessionId, expiresAt: Math.floor(Date.now() / 1000) + TEMP_AUTH_KEY_EXPIRY, } // encrypt using mtproto v1 (fucking kill me plz) const writer = TlBinaryWriter.alloc(this.params.writerMap, 80) // = 40 (inner length) + 32 (mtproto header) + 8 (pad 72 so mod 16 = 0) writer.raw(randomBytes(16)) writer.long(msgId) writer.int(0) // seq_no writer.int(40) // msg_len writer.object(inner) const msgWithoutPadding = writer.result() writer.raw(randomBytes(8)) const msgWithPadding = writer.result() const hash = await this.params.crypto.sha1(msgWithoutPadding) const msgKey = hash.slice(4, 20) const ige = await createAesIgeForMessageOld( this.params.crypto, this._session._authKey.key, msgKey, true, ) const encryptedData = await ige.encrypt(msgWithPadding) const encryptedMessage = Buffer.concat([ this._session._authKey.id, msgKey, encryptedData, ]) const promise = createControllablePromise< mtp.RawMt_rpc_error | boolean >() // encrypt the message using temp key and same msg id // this is a bit of a hack, but it works // // hacking inside main send loop to allow sending // with another key is just too much hassle. // we could just always use temp key if one is available, // but that way we won't be able to refresh the key // that is about to expire in the background without // interrupting actual message flow // decrypting is trivial though, since key id // is in the first bytes of the message, and is never used later on. this._session.pendingMessages.set(msgId, { _: 'bind', promise, }) const request: tl.auth.RawBindTempAuthKeyRequest = { _: 'auth.bindTempAuthKey', permAuthKeyId: inner.permAuthKeyId, nonce: inner.nonce, expiresAt: inner.expiresAt, encryptedMessage, } const reqSize = TlSerializationCounter.countNeededBytes( this._writerMap, request, ) const reqWriter = TlBinaryWriter.alloc( this._writerMap, reqSize + 16, ) reqWriter.long(this._registerOutgoingMsgId(msgId)) reqWriter.uint(this._session.getSeqNo()) reqWriter.uint(reqSize) reqWriter.object(request) // we can now send it as is const requestEncrypted = await tempKey.encryptMessage( reqWriter.result(), tempServerSalt, this._session._sessionId, ) await this.send(requestEncrypted) const res = await promise this._session.pendingMessages.delete(msgId) if (!this._usePfs) { this.log.info( 'pfs has been disabled while binding temp key', ) return } if (typeof res === 'object') { this.log.error( 'failed to bind temp key: %s:%s', res.errorCode, res.errorMessage, ) throw new MtcuteError('Failed to bind temporary key') } // now we can swap the keys (secondary becomes primary, // and primary is not immediately forgot because messages using it may still be in flight) this._session._authKeyTempSecondary = this._session._authKeyTemp this._session._authKeyTemp = tempKey this._session.serverSalt = tempServerSalt this.log.debug( 'temp key has been bound, exp = %d', inner.expiresAt, ) this._isPfsBindingPending = false this._isPfsBindingPendingInBackground = false // we must re-init connection after binding temp key this._session.initConnectionCalled = false this.emit('tmp-key-change', tempAuthKey, inner.expiresAt) this.onConnectionUsable() // set a timeout to update temp auth key in advance to avoid interruption this._pfsUpdateTimeout = setTimeout(() => { this._pfsUpdateTimeout = undefined this.log.debug('temp key is expiring soon') this._authorizePfs(true) }, (TEMP_AUTH_KEY_EXPIRY - 60) * 1000) }) .catch((err: Error) => { this.log.error('PFS Authorization error: %s', err.message) if (this._isPfsBindingPendingInBackground) { this._isPfsBindingPendingInBackground = false // if we are in background, we can just retry return this._authorizePfs(true) } this._isPfsBindingPending = false this.onError(err) this.reconnect() }) } waitForUnencryptedMessage(timeout = 5000): Promise { const promise = createControllablePromise() const timeoutId = setTimeout(() => { promise.reject(new MtTimeoutError(timeout)) this._pendingWaitForUnencrypted = this._pendingWaitForUnencrypted.filter( (it) => it[0] !== promise, ) }, timeout) this._pendingWaitForUnencrypted.push([promise, timeoutId]) return promise } protected async onMessage(data: Buffer): Promise { if (data.readInt32LE(0) === 0 && data.readInt32LE(4) === 0) { // auth_key_id = 0, meaning it's an unencrypted message used for authorization if (this._pendingWaitForUnencrypted.length) { const [promise, timeout] = this._pendingWaitForUnencrypted.shift()! clearTimeout(timeout) promise.resolve(data) } else { this.log.debug( 'unencrypted message received, but no one is waiting for it', ) } return } if (!this._session._authKey.ready) { // if a message is received before authorization, // either the server is misbehaving, // or there was a problem with authorization. this.log.warn('received message before authorization: %h', data) return } try { await this._session.decryptMessage(data, this._handleRawMessage) } catch (err) { this.log.error('failed to decrypt message: %s\ndata: %h', err, data) } } private _handleRawMessage( messageId: Long, seqNo: number, message: TlBinaryReader, ): void { if (message.peekUint() === 0x3072cfa1) { // gzip_packed // we can't use message.gzip() because it may contain msg_container, // so we parse it manually. message.uint() return this._handleRawMessage( messageId, seqNo, new TlBinaryReader( this._readerMap, gzipInflate(message.bytes()), ), ) } if (message.peekUint() === 0x73f1f8dc) { // msg_container message.uint() const count = message.uint() for (let i = 0; i < count; i++) { // msg_id:long seqno:int bytes:int const msgId = message.long() const seqNo = message.uint() // seqno const length = message.uint() // container can't contain other containers, but can contain rpc_result const obj = message.raw(length) this._handleRawMessage( msgId, seqNo, new TlBinaryReader(this._readerMap, obj), ) } return } if (message.peekUint() === 0xf35c6d01) { // rpc_result message.uint() return this._onRpcResult(messageId, message) } // we are safe.. i guess this._handleMessage(messageId, message.object()) } private _handleMessage(messageId: Long, message_: unknown): void { if (messageId.isEven()) { this.log.warn( 'warn: ignoring message with invalid messageId = %s (is even)', messageId, ) return } if (this._session.recentIncomingMsgIds.has(messageId)) { this.log.warn('warn: ignoring duplicate message %s', messageId) return } const message = message_ as mtp.TlObject this.log.debug('received %s (msg_id: %l)', message._, messageId) this._session.recentIncomingMsgIds.add(messageId) switch (message._) { case 'mt_msgs_ack': case 'mt_http_wait': case 'mt_bad_msg_notification': case 'mt_bad_server_salt': case 'mt_msgs_all_info': case 'mt_msgs_state_info': case 'mt_msg_detailed_info': case 'mt_msg_new_detailed_info': break default: this._sendAck(messageId) } switch (message._) { case 'mt_pong': this._onPong(message) break case 'mt_bad_server_salt': this._onBadServerSalt(message) break case 'mt_bad_msg_notification': this._onBadMsgNotification(messageId, message) break case 'mt_msgs_ack': message.msgIds.forEach((msgId) => this._onMessageAcked(msgId)) break case 'mt_new_session_created': this._onNewSessionCreated(message) break case 'mt_msgs_all_info': this._onMessagesInfo(message.msgIds, message.info) break case 'mt_msg_detailed_info': this._onMessageInfo( message.msgId, message.status, message.answerMsgId, ) break case 'mt_msg_new_detailed_info': this._onMessageInfo(Long.ZERO, 0, message.answerMsgId) break case 'mt_msgs_state_info': this._onMsgsStateInfo(message) break case 'mt_future_salts': // todo break case 'mt_msgs_state_req': case 'mt_msg_resend_req': // tdlib doesnt handle them, so why should we? :upside_down_face: this.log.warn( 'received %s (msg_id = %l): %j', message._, messageId, message, ) break case 'mt_destroy_session_ok': case 'mt_destroy_session_none': this._onDestroySessionResult(message) break default: if (tl.isAnyUpdates(message)) { if (this._usable && this.params.inactivityTimeout) { this._rescheduleInactivity() } this.log.verbose('<<< %j', message) if (this.params.disableUpdates) { this.log.warn( 'received updates, but updates are disabled', ) // likely due to some request in the session missing invokeWithoutUpdates break } this.emit('update', message) return } this.log.warn('unknown message received: %j', message) } } private _onRpcResult(messageId: Long, message: TlBinaryReader): void { if (this._usable && this.params.inactivityTimeout) { this._rescheduleInactivity() } const reqMsgId = message.long() if (reqMsgId.isZero()) { let resultType try { // eslint-disable-next-line resultType = (message.object() as any)._ } catch (err) { resultType = message.peekUint() } this.log.warn( 'received rpc_result with %j with req_msg_id = 0', resultType, ) return } const msg = this._session.pendingMessages.get(reqMsgId) if (!msg) { let result try { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment result = message.object() as any } catch (err) { result = '[failed to parse]' } // check if the msg is one of the recent ones if (this._session.recentOutgoingMsgIds.has(reqMsgId)) { this.log.debug( 'received rpc_result again for %l (contains %j)', reqMsgId, result, ) } else { this.log.warn( 'received rpc_result for unknown message %l: %j', reqMsgId, result, ) } return } this._sendAck(messageId) // special case for auth key binding if (msg._ !== 'rpc') { if (msg._ === 'bind') { this._sendAck(messageId) msg.promise.resolve(message.object()) return } this.log.error( 'received rpc_result for %s request %l', msg._, reqMsgId, ) return } const rpc = msg.rpc const customReader = this._readerMap._results![rpc.method] const result: any = customReader ? customReader(message) : message.object() // initConnection call was definitely received and // processed by the server, so we no longer need to use it if (rpc.initConn) { this._session.initConnectionCalled = true } this.log.verbose('<<< (%s) %j', rpc.method, result) if (result._ === 'mt_rpc_error') { const res = result as mtp.RawMt_rpc_error this.log.debug( 'received rpc_error [%d:%s] for %l (%s)', res.errorCode, res.errorMessage, reqMsgId, rpc.method, ) if (res.errorMessage === 'AUTH_KEY_PERM_EMPTY') { // happens when temp auth key is not yet bound // this shouldn't happen as we block any outbound communications // until the temp key is derived and bound. // // i think it is also possible for the error to be returned // when the temp key has expired, but this still shouldn't happen // but this is tg, so something may go wrong, and we will receive this as an error // (for god's sake why is this not in mtproto and instead hacked into the app layer) this._authorizePfs() this._onMessageFailed(reqMsgId, 'AUTH_KEY_PERM_EMPTY', true) return } if (res.errorMessage === 'CONNECTION_NOT_INITED') { // this seems to sometimes happen when using pfs // no idea why, but tdlib also seems to handle these, so whatever this._session.initConnectionCalled = false this._onMessageFailed(reqMsgId, res.errorMessage, true) // just setting this flag is not enough because the message // is already serialized, so we do this awesome hack this.sendRpc({ _: 'help.getNearestDc' }) .then(() => { this.log.debug( 'additional help.getNearestDc for initConnection ok', ) }) .catch((err) => { this.log.debug( 'additional help.getNearestDc for initConnection error: %s', err, ) }) return } if (rpc.cancelled) return const error = tl.RpcError.fromTl(res) if (this.params.niceStacks !== false) { makeNiceStack(error, rpc.stack!, rpc.method) } rpc.promise.reject(error) } else { this.log.debug( 'received rpc_result (%s) for request %l (%s)', result._, reqMsgId, rpc.method, ) if (rpc.cancelled) return rpc.promise.resolve(result) } this._onMessageAcked(reqMsgId) this._session.pendingMessages.delete(reqMsgId) } private _onMessageAcked(msgId: Long, inContainer = false): void { const msg = this._session.pendingMessages.get(msgId) if (!msg) { if (!this._session.recentOutgoingMsgIds.has(msgId)) { this.log.warn('received ack for unknown message %l', msgId) } return } switch (msg._) { case 'container': this.log.debug( 'received ack for container %l (size = %d)', msgId, msg.msgIds.length, ) msg.msgIds.forEach((msgId) => this._onMessageAcked(msgId, true)) // we no longer need info about the container this._session.pendingMessages.delete(msgId) break case 'rpc': { const rpc = msg.rpc this.log.debug( 'received ack for rpc query %l (%s, acked before = %s)', msgId, rpc.method, rpc.acked, ) rpc.acked = true if ( !inContainer && rpc.containerId && this._session.pendingMessages.has(rpc.containerId) ) { // ack all the messages in that container this._onMessageAcked(rpc.containerId) } // this message could also already be in some queue, removeFromLongArray(this._session.queuedStateReq, msgId) removeFromLongArray(this._session.queuedResendReq, msgId) // if resend/state was already requested, it will simply be ignored this._session.getStateSchedule.remove(rpc) break } case 'bind': break // do nothing, wait for the result default: if (!inContainer) { this.log.warn( 'received unexpected ack for %s query %l', msg._, msgId, ) } } } private _onAllFailed(reason: string) { // called when all the pending messages are to be resent // e.g. when server returns 429 // most service messages can be omitted as stale for (const msgId of this._session.pendingMessages.keys()) { const info = this._session.pendingMessages.get(msgId)! switch (info._) { case 'container': case 'state': case 'resend': case 'ping': // no longer relevant this._session.pendingMessages.delete(msgId) break default: this._onMessageFailed(msgId, reason, true) break } } } private _onMessageFailed( msgId: Long, reason: string, inContainer = false, ): void { const msgInfo = this._session.pendingMessages.get(msgId) if (!msgInfo) { this.log.debug( 'unknown message %l failed because of %s', msgId, reason, ) return } switch (msgInfo._) { case 'container': this.log.debug( 'container %l (size = %d) failed because of %s', msgId, msgInfo.msgIds.length, reason, ) msgInfo.msgIds.forEach((msgId) => this._onMessageFailed(msgId, reason, true), ) break case 'ping': this.log.debug( 'ping (msg_id = %l) failed because of %s', msgId, reason, ) // restart ping this._session.resetLastPing(true) break case 'rpc': { const rpc = msgInfo.rpc this.log.debug( 'rpc query %l (%s) failed because of %s', msgId, rpc.method, reason, ) // since the query was rejected, we can let it reassign msg_id to avoid containers this._session.pendingMessages.delete(msgId) rpc.msgId = undefined this._enqueueRpc(rpc, true) if ( !inContainer && rpc.containerId && this._session.pendingMessages.has(rpc.containerId) ) { // fail all the messages in that container this._onMessageFailed(rpc.containerId, reason) } // this message could also already be in some queue, removeFromLongArray(this._session.queuedStateReq, msgId) removeFromLongArray(this._session.queuedResendReq, msgId) // if resend/state was already requested, it will simply be ignored this._session.getStateSchedule.remove(rpc) break } case 'resend': this.log.debug( 'resend request %l (size = %d) failed because of %s', msgId, msgInfo.msgIds.length, reason, ) this._session.queuedResendReq.splice(0, 0, ...msgInfo.msgIds) this._flushTimer.emitWhenIdle() break case 'state': this.log.debug( 'state request %l (size = %d) failed because of %s', msgId, msgInfo.msgIds.length, reason, ) this._session.queuedStateReq.splice(0, 0, ...msgInfo.msgIds) this._flushTimer.emitWhenIdle() break case 'bind': this.log.debug( 'temp key binding request %l failed because of %s, retrying', msgId, reason, ) msgInfo.promise.reject(Error(reason)) } this._session.pendingMessages.delete(msgId) } private _registerOutgoingMsgId(msgId: Long): Long { this._session.recentOutgoingMsgIds.add(msgId) return msgId } private _onPong({ msgId, pingId }: mtp.RawMt_pong): void { const info = this._session.pendingMessages.get(msgId) if (!info) { this.log.warn( 'received pong to unknown ping (msg_id %l, ping_id %l)', msgId, pingId, ) return } if (info._ !== 'ping') { this.log.warn( 'received pong to %s query, not ping (msg_id %l, ping_id %l)', info._, msgId, pingId, ) return } if (info.pingId.neq(pingId)) { this.log.warn( 'received pong to %l, but expected ping_id = %l (got %l)', msgId, info.pingId, pingId, ) } const rtt = Date.now() - this._session.lastPingTime this._session.lastPingRtt = rtt if (info.containerId.neq(msgId)) { this._onMessageAcked(info.containerId) } this.log.debug( 'received pong: msg_id %l, ping_id %l, rtt = %dms', msgId, pingId, rtt, ) this._session.resetLastPing() } private _onBadServerSalt(msg: mtp.RawMt_bad_server_salt): void { this._session.serverSalt = msg.newServerSalt this._onMessageFailed(msg.badMsgId, 'bad_server_salt') } private _onBadMsgNotification( msgId: Long, msg: mtp.RawMt_bad_msg_notification, ): void { switch (msg.errorCode) { case 16: case 17: case 20: { if (msg.errorCode !== 20) { // msg_id is either too high or too low // code 20 means msg_id is too old, // we just need to resend the message const serverTime = msgId.low >>> 0 const timeOffset = Math.floor(Date.now() / 1000) - serverTime this._session._timeOffset = timeOffset this.log.debug( 'server time: %d, corrected offset to %d', serverTime, timeOffset, ) } this._onMessageFailed( msg.badMsgId, `bad_msg_notification ${msg.errorCode}`, ) break } default: // something went very wrong, we need to reset the session this.log.error( 'received bad_msg_notification for msg_id = %l, code = %d. session will be reset', msg.badMsgId, msg.errorCode, ) this._resetSession() break } } private _onNewSessionCreated({ firstMsgId, serverSalt, uniqueId, }: mtp.RawMt_new_session_created): void { if (uniqueId.eq(this._session.lastSessionCreatedUid)) { this.log.debug( 'received new_session_created with the same uid = %l, ignoring', uniqueId, ) return } if ( !this._session.lastSessionCreatedUid.isZero() && !this.params.disableUpdates ) { // force the client to fetch missed updates // when _lastSessionCreatedUid == 0, the connection has // just been established, and the client will fetch them anyways this.emit('update', { _: 'updatesTooLong' }) } this._session.serverSalt = serverSalt this.log.debug( 'received new_session_created, uid = %l, first msg_id = %l', uniqueId, firstMsgId, ) for (const msgId of this._session.pendingMessages.keys()) { const val = this._session.pendingMessages.get(msgId)! if (val._ === 'bind') { // should NOT happen. if (msgId.lt(firstMsgId)) { this._onMessageFailed(msgId, 'received in wrong session') } continue } if (val._ === 'container') { if (msgId.lt(firstMsgId)) { // all messages in this container will be resent // info about this container is no longer needed this._session.pendingMessages.delete(msgId) } return } const containerId = val._ === 'rpc' ? val.rpc.containerId || msgId : val.containerId if (containerId.lt(firstMsgId)) { this._onMessageFailed(msgId, 'new_session_created', true) } } } private _onMessageInfo( msgId: Long, status: number, answerMsgId: Long, ): void { if (!msgId.isZero()) { const info = this._session.pendingMessages.get(msgId) if (!info) { if (!this._session.recentOutgoingMsgIds.has(msgId)) { this.log.warn( 'received message info about unknown message %l', msgId, ) } return } switch (status & 7) { case 1: case 2: case 3: // message wasn't received by the server this._onMessageFailed(msgId, `message info state ${status}`) break case 0: if (!answerMsgId.isZero()) { this.log.warn( 'received message info with status = 0: msg_id = %l, status = %d, ans_id = %l', msgId, status, answerMsgId, ) return this._onMessageFailed( msgId, 'message info state = 0, ans_id = 0', ) } // fallthrough case 4: this._onMessageAcked(msgId) break } } if ( !answerMsgId.isZero() && !this._session.recentIncomingMsgIds.has(answerMsgId) ) { this.log.debug( 'received message info for %l, but answer (%l) was not received yet', msgId, answerMsgId, ) this._session.queuedResendReq.push(answerMsgId) this._flushTimer.emitWhenIdle() return } this.log.debug( 'received message info for %l, and answer (%l) was already received', msgId, answerMsgId, ) } private _onMessagesInfo(msgIds: Long[], info: Buffer): void { if (msgIds.length !== info.length) { this.log.warn( 'messages state info was invalid: msg_ids.length !== info.length', ) } for (let i = 0; i < msgIds.length; i++) { this._onMessageInfo(msgIds[i], info[i], Long.ZERO) } } private _onMsgsStateInfo(msg: mtp.RawMt_msgs_state_info): void { const info = this._session.pendingMessages.get(msg.reqMsgId) if (!info) { this.log.warn( 'received msgs_state_info to unknown request %l', msg.reqMsgId, ) return } if (info._ !== 'state') { this.log.warn( 'received msgs_state_info to %s query %l', info._, msg.reqMsgId, ) return } this._onMessagesInfo(info.msgIds, msg.info) } private _onDestroySessionResult(msg: mtp.TypeDestroySessionRes): void { const reqMsgId = this._session.destroySessionIdToMsgId.get( msg.sessionId, ) if (!reqMsgId) { this.log.warn( 'received %s for unknown session %h', msg._, msg.sessionId, ) return } this._session.destroySessionIdToMsgId.delete(msg.sessionId) this._session.pendingMessages.delete(reqMsgId) this.log.debug('received %s for session %h', msg._, msg.sessionId) } private _enqueueRpc(rpc: PendingRpc, force?: boolean) { if (this._session.enqueueRpc(rpc, force)) { this._flushTimer.emitWhenIdle() } } _resetSession(): void { this._queuedDestroySession.push(this._session._sessionId) this._session.resetState(true) this.reconnect() // once we receive new_session_created, all pending messages will be resent. this._flushTimer.reset() } private _sendAck(msgId: Long): void { if (this._session.queuedAcks.length === 0) { this._flushTimer.emitBeforeNext(30000) } this._session.queuedAcks.push(msgId) if (this._session.queuedAcks.length >= 100) { this._flushTimer.emitWhenIdle() } } sendRpc( request: T, stack?: string, timeout?: number, ): Promise { if (this._usable && this.params.inactivityTimeout) { this._rescheduleInactivity() } if (!stack && this.params.niceStacks !== false) { stack = new Error().stack } const method = request._ let obj: tl.TlObject = request let initConn = false if (this.params.disableUpdates) { obj = { _: 'invokeWithoutUpdates', query: obj, } } if (!this._session.initConnectionCalled) { // we will wrap every rpc call with initConnection // until some of the requests wrapped with it is // either acked or returns rpc_result this.log.debug( 'wrapping %s with initConnection, layer: %d', method, this.params.layer, ) const proxy = this._transport.getMtproxyInfo?.() obj = { _: 'invokeWithLayer', layer: this.params.layer, query: { ...this.params.initConnection, proxy, query: obj, }, } initConn = true } this.log.verbose('>>> %j', obj) let content = TlBinaryWriter.serializeObject(this._writerMap, obj) if (content.length > 1044404) { // if you send larger payloads, telegram will just close connection, // and since we resend them, it will get resent after reconnection and // that will be an endless loop of reconnections. we don't want that, // and payloads this large are usually a sign of an error in the code. throw new MtArgumentError( `Payload is too big (${content.length} > 1044404)`, ) } // gzip let shouldGzip = content.length > 128 if (content.length > 16384) { // test compression ratio for the middle part // if it is less than 0.9, then try to compress the whole request const middle = ~~((content.length - 1024) / 2) const gzipped = gzipDeflate( content.slice(middle, middle + 1024), 0.9, ) if (!gzipped) shouldGzip = false } if (shouldGzip) { const gzipped = gzipDeflate(content, 0.9) if (gzipped) { this.log.debug( 'gzipped %s (%db -> %db)', method, content.length, gzipped.length, ) content = gzipped } else { shouldGzip = false } } const pending: PendingRpc = { method, // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment promise: undefined as any, // because we need the object to make a promise data: content, stack, // we will need to know size of gzip_packed overhead in _flush() gzipOverhead: shouldGzip ? 4 + TlSerializationCounter.countBytesOverhead(content.length) : 0, initConn, // setting them as well so jit can optimize stuff sent: undefined, getState: undefined, msgId: undefined, seqNo: undefined, containerId: undefined, acked: undefined, cancelled: undefined, timeout: undefined, } const promise = createCancellablePromise( this._cancelRpc.bind(this, pending), ) pending.promise = promise if (timeout) { pending.timeout = setTimeout( this._cancelRpc, timeout, pending, true, ) } this._enqueueRpc(pending, true) return promise } private _cancelRpc(rpc: PendingRpc, onTimeout = false): void { if (rpc.cancelled && !onTimeout) { throw new MtcuteError('RPC was already cancelled') } if (!onTimeout && rpc.timeout) { clearTimeout(rpc.timeout) } if (onTimeout) { // todo: replace with MtTimeoutError const error = new tl.RpcError(-503, 'Timeout') if (this.params.niceStacks !== false) { makeNiceStack(error, rpc.stack!, rpc.method) } rpc.promise.reject(error) } rpc.cancelled = true if (rpc.msgId) { this._session.queuedCancelReq.push(rpc.msgId) this._flushTimer.emitWhenIdle() } else { // in case rpc wasn't sent yet (or had some error), // we can simply remove it from queue this._session.queuedRpc.remove(rpc) } } protected _onInactivityTimeout() { // we should send all pending acks and other service messages // before dropping the connection if (!this._session.hasPendingMessages) { this.log.debug('no pending service messages, closing connection') super._onInactivityTimeout() return } this._inactivityPendingFlush = true this._flush() } flushWhenIdle(): void { this._flushTimer.emitWhenIdle() } private _flush(): void { if ( !this._session._authKey.ready || this._isPfsBindingPending || this._session.current429Timeout ) { this.log.debug( 'skipping flush, connection is not usable (auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)', this._session._authKey.ready, this._isPfsBindingPending, Boolean(this._session.current429Timeout), ) // it will be flushed once connection is usable return } try { this._doFlush() } catch (e: any) { this.log.error('flush error: %s', (e as Error).stack) // should not happen unless there's a bug in the code } // schedule next flush // if there are more queued requests, flush immediately // (they likely didn't fit into one message) if (this._session.hasPendingMessages) { // we schedule it on the next tick, so we can load-balance // between multiple connections using the same session this._flushTimer.emitWhenIdle() } else if (this._inactivityPendingFlush) { this.log.debug('pending messages sent, closing connection') this._flushTimer.reset() this._inactivityPendingFlush = false super._onInactivityTimeout() } else { this._flushTimer.emitBefore(this._session.lastPingTime + 60000) } } private _doFlush(): void { this.log.debug( 'flushing send queue. queued rpc: %d', this._session.queuedRpc.length, ) // oh bloody hell mate // total size & count let packetSize = 0 let messageCount = 0 // size & msg count that count towards container limit // these will be added to total later let containerMessageCount = 0 let containerSize = 0 let ackRequest: Buffer | null = null let ackMsgIds: Long[] | null = null let pingRequest: Buffer | null = null let pingId: Long | null = null let pingMsgId: Long | null = null let getStateRequest: Buffer | null = null let getStateMsgId: Long | null = null let getStateMsgIds: Long[] | null = null let resendRequest: Buffer | null = null let resendMsgId: Long | null = null let resendMsgIds: Long[] | null = null let cancelRpcs: Long[] | null = null let destroySessions: Long[] | null = null const now = Date.now() if (this._session.queuedAcks.length) { let acks = this._session.queuedAcks if (acks.length > 8192) { this._session.queuedAcks = acks.slice(8192) acks = acks.slice(0, 8192) } else { this._session.queuedAcks = [] } const obj: mtp.RawMt_msgs_ack = { _: 'mt_msgs_ack', msgIds: acks, } ackMsgIds = obj.msgIds ackRequest = TlBinaryWriter.serializeObject(this._writerMap, obj) packetSize += ackRequest.length + 16 messageCount += 1 } const getStateTime = now + 1500 if (now - this._session.lastPingTime > 60000) { if (!this._session.lastPingMsgId.isZero()) { this.log.warn( "didn't receive pong for previous ping (msg_id = %l)", this._session.lastPingMsgId, ) this._session.pendingMessages.delete( this._session.lastPingMsgId, ) } pingId = randomLong() const obj: mtp.RawMt_ping = { _: 'mt_ping', pingId, } this._session.lastPingTime = Date.now() pingRequest = TlBinaryWriter.serializeObject(this._writerMap, obj) containerSize += pingRequest.length + 16 containerMessageCount += 1 } { if (this._session.queuedStateReq.length) { let ids = this._session.queuedStateReq if (ids.length > 8192) { this._session.queuedStateReq = ids.slice(8192) ids = ids.slice(0, 8192) } else { this._session.queuedStateReq = [] } getStateMsgIds = ids } const idx = this._session.getStateSchedule.index( { getState: now } as PendingRpc, true, ) if (idx > 0) { const toGetState = this._session.getStateSchedule.raw.splice( 0, idx, ) if (!getStateMsgIds) getStateMsgIds = [] toGetState.forEach((it) => getStateMsgIds!.push(it.msgId!)) } if (getStateMsgIds) { const obj: mtp.RawMt_msgs_state_req = { _: 'mt_msgs_state_req', msgIds: getStateMsgIds, } getStateRequest = TlBinaryWriter.serializeObject( this._writerMap, obj, ) packetSize += getStateRequest.length + 16 messageCount += 1 } } if (this._session.queuedResendReq.length) { resendMsgIds = this._session.queuedResendReq if (resendMsgIds.length > 8192) { this._session.queuedResendReq = resendMsgIds.slice(8192) resendMsgIds = resendMsgIds.slice(0, 8192) } else { this._session.queuedResendReq = [] } const obj: mtp.RawMt_msg_resend_req = { _: 'mt_msg_resend_req', msgIds: resendMsgIds, } resendRequest = TlBinaryWriter.serializeObject(this._writerMap, obj) packetSize += resendRequest.length + 16 messageCount += 1 } if (this._session.queuedCancelReq.length) { containerMessageCount += this._session.queuedCancelReq.length containerSize += this._session.queuedCancelReq.length * 28 cancelRpcs = this._session.queuedCancelReq this._session.queuedCancelReq = [] } if (this._queuedDestroySession.length) { containerMessageCount += this._queuedDestroySession.length containerSize += this._queuedDestroySession.length * 28 destroySessions = this._queuedDestroySession this._queuedDestroySession = [] } let forceContainer = false const rpcToSend: PendingRpc[] = [] while ( this._session.queuedRpc.length && containerSize < 32768 && // 2^15 containerMessageCount < 1020 ) { const msg = this._session.queuedRpc.popFront()! if (msg.cancelled) continue // note: we don't check for <2^15 here // this is not documented, but large requests // (like upload.saveFilePart) *may* exceed that limit rpcToSend.push(msg) containerSize += msg.data.length + 16 if (msg.gzipOverhead) containerSize += msg.gzipOverhead // if message was already assigned a msg_id, // we must wrap it in a container with a newer msg_id if (msg.msgId) forceContainer = true // having >1 upload.getFile within a container seems to cause flood_wait errors // also a crutch for load-balancing if (msg.method === 'upload.getFile') break } packetSize += containerSize messageCount += containerMessageCount + rpcToSend.length if (!messageCount) { this.log.debug('flush did not happen: nothing to flush') return } const useContainer = forceContainer || messageCount > 1 if (useContainer) packetSize += 24 // 8 (msg_container) + 16 (mtproto header) const writer = TlBinaryWriter.alloc(this._writerMap, packetSize) if (useContainer) { // leave bytes for mtproto header (we'll write it later, // since we need seqno and msg_id to be larger than the content) writer.pos += 16 writer.uint(0x73f1f8dc) // msg_container writer.uint(messageCount) } const otherPendings: Exclude< PendingMessage, { _: 'rpc' | 'container' | 'bind' } >[] = [] if (ackRequest) { this._registerOutgoingMsgId( this._session.writeMessage(writer, ackRequest), ) } if (pingRequest) { pingMsgId = this._registerOutgoingMsgId( this._session.writeMessage(writer, pingRequest), ) this._session.lastPingMsgId = pingMsgId const pingPending: PendingMessage = { _: 'ping', pingId: pingId!, containerId: pingMsgId, } this._session.pendingMessages.set(pingMsgId, pingPending) otherPendings.push(pingPending) } if (getStateRequest) { getStateMsgId = this._registerOutgoingMsgId( this._session.writeMessage(writer, getStateRequest), ) const getStatePending: PendingMessage = { _: 'state', msgIds: getStateMsgIds!, containerId: getStateMsgId, } this._session.pendingMessages.set(getStateMsgId, getStatePending) otherPendings.push(getStatePending) } if (resendRequest) { resendMsgId = this._registerOutgoingMsgId( this._session.writeMessage(writer, resendRequest), ) const resendPending: PendingMessage = { _: 'resend', msgIds: resendMsgIds!, containerId: resendMsgId, } this._session.pendingMessages.set(resendMsgId, resendPending) otherPendings.push(resendPending) } if (cancelRpcs) { cancelRpcs.forEach((msgId) => { const cancelMsgId = this._registerOutgoingMsgId( this._session.writeMessage(writer, { _: 'mt_rpc_drop_answer', reqMsgId: msgId, }), ) const pending: PendingMessage = { _: 'cancel', msgId, containerId: cancelMsgId, } this._session.pendingMessages.set(cancelMsgId, pending) otherPendings.push(pending) }) } if (destroySessions) { destroySessions.forEach((sessionId) => { const msgId = this._registerOutgoingMsgId( this._session.writeMessage(writer, { _: 'mt_destroy_session', sessionId, }), ) const pending: PendingMessage = { _: 'destroy_session', sessionId, containerId: msgId, } this._session.pendingMessages.set(msgId, pending) this._session.destroySessionIdToMsgId.set(sessionId, msgId) otherPendings.push(pending) }) } for (let i = 0; i < rpcToSend.length; i++) { const msg = rpcToSend[i] // not using writeMessage here because we also need seqNo, and // i dont want to also return seqNo there because that would mean // extra object overhead if (!msg.msgId) { const msgId = this._session.getMessageId() const seqNo = this._session.getSeqNo() this.log.debug( '%s: msg_id assigned %l, seqno: %d', msg.method, msgId, seqNo, ) msg.msgId = msgId msg.seqNo = seqNo this._session.pendingMessages.set(msgId, { _: 'rpc', rpc: msg, }) } else { this.log.debug( '%s: msg_id already assigned, reusing %l, seqno: %d', msg.method, msg.msgId, msg.seqNo, ) } // (re-)schedule get_state if needed if (msg.getState) { this._session.getStateSchedule.remove(msg) } if (!msg.acked) { msg.getState = getStateTime this._session.getStateSchedule.insert(msg) } writer.long(this._registerOutgoingMsgId(msg.msgId)) writer.uint(msg.seqNo!) if (msg.gzipOverhead) { writer.uint(msg.data.length + msg.gzipOverhead) writer.uint(0x3072cfa1) // gzip_packed#3072cfa1 writer.bytes(msg.data) } else { writer.uint(msg.data.length) writer.raw(msg.data) } msg.sent = true } if (useContainer) { // we now need to assign the container msg_id and seqno // we couldn't have assigned them earlier because mtproto // requires them to be >= than the contained messages // writer.pos is expected to be packetSize const containerId = this._session.getMessageId() writer.pos = 0 writer.long(this._registerOutgoingMsgId(containerId)) writer.uint(this._session.getSeqNo(false)) writer.uint(packetSize - 16) writer.pos = packetSize const msgIds = [] for (let i = 0; i < rpcToSend.length; i++) { const msg = rpcToSend[i] msg.containerId = containerId msgIds.push(msg.msgId!) } if (otherPendings.length) { otherPendings.forEach((msg) => { msgIds.push(msg.containerId) msg.containerId = containerId }) } this._session.pendingMessages.set(containerId, { _: 'container', msgIds, }) } const result = writer.result() // probably the easiest way lol const rootMsgId = new Long(result.readInt32LE(), result.readInt32LE(4)) this.log.debug( 'sending %d messages: size = %db, acks = %d (msg_id = %s), ping = %s (msg_id = %s), state_req = %s (msg_id = %s), resend = %s (msg_id = %s), rpc = %s, container = %s, root msg_id = %l', messageCount, packetSize, ackMsgIds?.length || 'false', ackMsgIds?.map((it) => it.toString()), Boolean(pingRequest), pingMsgId, getStateMsgIds?.map((it) => it.toString()) || 'false', getStateMsgId, resendMsgIds?.map((it) => it.toString()) || 'false', resendMsgId, rpcToSend.map((it) => it.method), useContainer, rootMsgId, ) this._session .encryptMessage(result) .then((enc) => this.send(enc)) .catch((err: Error) => { this.log.error( 'error while sending pending messages (root msg_id = %l): %s', rootMsgId, err.stack, ) // put acks in the front so they are the first to be sent if (ackMsgIds) { this._session.queuedAcks.splice(0, 0, ...ackMsgIds) } this._onMessageFailed(rootMsgId, 'unknown error') }) } }