mtcute/packages/core/src/network/session-connection.ts

1596 lines
49 KiB
TypeScript
Raw Normal View History

import {
PersistentConnection,
PersistentConnectionParams,
} from './persistent-connection'
import { mtp, tl } from '@mtcute/tl'
import { Logger, LongMap, randomLong, removeFromLongArray } from '../utils'
import { MtprotoSession } from './mtproto-session'
import { doAuthorization } from './authorization'
import { TransportError } from './transports'
import Long from 'long'
import { LruSet } from '../utils/lru-string-set'
import {
ControllablePromise,
createCancellablePromise,
} from '../utils/controllable-promise'
import { gzipDeflate, gzipInflate } from '@mtcute/tl-runtime/src/platform/gzip'
import { SortedArray } from '../utils/sorted-array'
import { EarlyTimer } from '../utils/early-timer'
import { Deque } from '../utils'
import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlSerializationCounter, TlWriterMap } from '@mtcute/tl-runtime'
export interface SessionConnectionParams extends PersistentConnectionParams {
initConnection: tl.RawInitConnectionRequest
inactivityTimeout?: number
niceStacks?: boolean
layer: number
disableUpdates?: boolean
readerMap: TlReaderMap
writerMap: TlWriterMap
}
interface PendingRpc {
method: string
data: Buffer
promise: ControllablePromise
stack?: string
gzipOverhead?: number
sent?: boolean
msgId?: Long
seqNo?: number
containerId?: Long
acked?: boolean
initConn?: boolean
getState?: number
cancelled?: boolean
timeout?: number
}
type PendingMessage =
| {
_: 'rpc'
rpc: PendingRpc
}
| {
_: 'container'
msgIds: Long[]
}
| {
_: 'state'
msgIds: Long[]
containerId: Long
}
| {
_: 'resend'
msgIds: Long[]
containerId: Long
}
| {
_: 'ping'
pingId: Long
containerId: Long
}
| {
_: 'destroy_session'
sessionId: Long
containerId: Long
}
| {
_: 'cancel'
msgId: Long
containerId: Long
}
| {
_: 'future_salts'
containerId: Long
}
// destroy_session#e7512126 session_id:long
// todo
const DESTROY_SESSION_ID = Buffer.from('262151e7', 'hex')
function makeNiceStack(error: tl.errors.RpcError, stack: string, method?: string) {
error.stack = `${error.constructor.name} (${error.code} ${error.text}): ${
error.message
}\n at ${method}\n${stack.split('\n').slice(2).join('\n')}`
}
let nextConnectionUid = 0
export class SessionConnection extends PersistentConnection {
readonly params!: SessionConnectionParams
private _uid = nextConnectionUid++
private _session: MtprotoSession
private _flushTimer = new EarlyTimer()
/// internal state ///
// recent msg ids
private _recentOutgoingMsgIds = new LruSet<Long>(1000, false, true)
private _recentIncomingMsgIds = new LruSet<Long>(1000, false, true)
// queues
private _queuedRpc = new Deque<PendingRpc>()
private _queuedAcks: Long[] = []
private _queuedStateReq: Long[] = []
private _queuedResendReq: Long[] = []
private _queuedCancelReq: Long[] = []
private _queuedDestroySession: Long[] = []
private _getStateSchedule = new SortedArray<PendingRpc>(
[],
(a, b) => a.getState! - b.getState!
)
// requests info
private _pendingMessages = new LongMap<PendingMessage>()
private _initConnectionCalled = false
private _lastPingRtt = NaN
private _lastPingTime = 0
private _lastPingMsgId = Long.ZERO
private _lastSessionCreatedUid = Long.ZERO
private _next429Timeout = 1000
private _current429Timeout?: NodeJS.Timeout
private _readerMap: TlReaderMap
private _writerMap: TlWriterMap
constructor(params: SessionConnectionParams, log: Logger) {
super(params, log.create('conn'))
this._updateLogPrefix()
this._session = new MtprotoSession(
params.crypto,
log.create('session'),
params.readerMap,
params.writerMap
)
this._flushTimer.onTimeout(this._flush.bind(this))
this._readerMap = params.readerMap
this._writerMap = params.writerMap
this._handleRawMessage = this._handleRawMessage.bind(this)
}
private _updateLogPrefix() {
this.log.prefix = `[UID ${this._uid}, DC ${this.params.dc.id}] `
}
async changeDc(dc: tl.RawDcOption, authKey?: Buffer): Promise<void> {
this.log.debug('dc changed (has_auth_key = %b) to: %j', authKey, dc)
this._updateLogPrefix()
this._session.reset()
await this._session.setupKeys(authKey)
this.params.dc = dc
this.reconnect()
}
setupKeys(authKey: Buffer | null): Promise<void> {
return this._session.setupKeys(authKey)
}
getAuthKey(): Buffer | undefined {
return this._session._authKey
}
onTransportClose(): void {
super.onTransportClose()
this.emit('disconnect')
this.reset()
}
destroy(): void {
super.destroy()
this.reset(true)
}
reset(forever = false): void {
this._initConnectionCalled = false
this._resetLastPing(true)
this._flushTimer.reset()
clearTimeout(this._current429Timeout!)
if (forever) {
// reset all the queues, cancel all pending messages, etc
this._session.reset()
for (const info of this._pendingMessages.values()) {
if (info._ === 'rpc') {
info.rpc.promise.reject(new Error('Connection destroyed'))
}
}
this._pendingMessages.clear()
this._recentOutgoingMsgIds.clear()
this._recentIncomingMsgIds.clear()
while (this._queuedRpc.length) {
const rpc = this._queuedRpc.popFront()!
if (rpc.sent === false) {
rpc.promise.reject(new Error('Connection destroyed'))
}
}
this._queuedAcks.length = 0
this._queuedStateReq.length = 0
this._queuedResendReq.length = 0
this._getStateSchedule.clear()
this.removeAllListeners()
}
}
protected async onConnected(): Promise<void> {
if (this._session.authorized) {
this.onConnectionUsable()
} else {
this._authorize()
}
}
protected onError(error: Error): void {
// https://core.telegram.org/mtproto/mtproto-_transports#_transport-errors
if (error instanceof TransportError) {
this.log.error('transport error %d', error.code)
if (error.code === 404) {
this._session.reset()
this.emit('key-change', null)
this._authorize()
return
}
if (error.code === 429) {
// all active queries must be resent
const timeout = this._next429Timeout
this._next429Timeout = Math.min(this._next429Timeout * 2, 16000)
clearTimeout(this._current429Timeout!)
this._current429Timeout = setTimeout(() => {
this._current429Timeout = undefined
this._flushTimer.emitNow()
}, timeout)
this.log.debug(
'transport flood, waiting for %d ms before proceeding',
timeout
)
for (const msgId of this._pendingMessages.keys()) {
const info = this._pendingMessages.get(msgId)!
if (info._ === 'container') {
this._pendingMessages.delete(msgId)
} else {
this._onMessageFailed(msgId, 'transport flood', true)
}
}
return
}
}
this.emit('error', error)
}
protected onConnectionUsable() {
super.onConnectionUsable()
// just in case
this._flushTimer.emitBeforeNext(1000)
}
private _authorize(): void {
doAuthorization(this, this.params.crypto)
.then(async ([authKey, serverSalt, timeOffset]) => {
await this._session.setupKeys(authKey)
this._session.serverSalt = serverSalt
this._session._timeOffset = timeOffset
this.emit('key-change', authKey)
this.onConnectionUsable()
})
.catch((err) => {
this.log.error('Authorization error: %s', err.message)
this.onError(err)
this.reconnect()
})
}
protected async onMessage(data: Buffer): Promise<void> {
if (!this._session.authorized) {
// if a message is received before authorization,
// either the server is misbehaving,
// or there was a problem with authorization.
this.log.warn('received message before authorization: %h', data)
return
}
try {
await this._session.decryptMessage(data, this._handleRawMessage)
} catch (err) {
this.log.error('failed to decrypt message: %s\ndata: %h', err, data)
}
}
private _handleRawMessage(
messageId: Long,
seqNo: number,
message: TlBinaryReader
): void {
if (message.peekUint() === 0x3072cfa1) {
// gzip_packed
// we can't use message.gzip() because it may contain msg_container,
// so we parse it manually.
message.uint()
return this._handleRawMessage(
messageId,
seqNo,
new TlBinaryReader(
this._readerMap,
gzipInflate(message.bytes())
)
)
}
if (message.peekUint() === 0x73f1f8dc) {
// msg_container
message.uint()
const count = message.uint()
for (let i = 0; i < count; i++) {
// msg_id:long seqno:int bytes:int
const msgId = message.long()
message.uint() // seqno
const length = message.uint()
// container can't contain other containers, so we are safe
const start = message.pos
const obj = message.object()
// ensure length
if (message.pos - start !== length) {
this.log.warn(
'received message with invalid length in container (%d != %d)',
message.pos - start,
length
)
}
this._handleMessage(msgId, obj)
}
return
}
// we are safe.. i guess
this._handleMessage(messageId, message.object())
}
private _handleMessage(
messageId: Long,
message: mtp.TlObject
): void {
if (messageId.isEven()) {
this.log.warn(
'warn: ignoring message with invalid messageId = %s (is even)',
messageId
)
return
}
2022-05-18 13:04:40 +03:00
if (this._recentIncomingMsgIds.has(messageId)) {
this.log.warn('warn: ignoring duplicate message %s', messageId)
return
}
this.log.verbose('received %s (msg_id: %s)', message._, messageId)
this._recentIncomingMsgIds.add(messageId)
switch (message._) {
case 'mt_msgs_ack':
case 'mt_http_wait':
case 'mt_bad_msg_notification':
case 'mt_bad_server_salt':
case 'mt_msgs_all_info':
case 'mt_msgs_state_info':
case 'mt_msg_detailed_info':
case 'mt_msg_new_detailed_info':
break
default:
this._sendAck(messageId)
}
switch (message._) {
case 'mt_rpc_result':
this._onRpcResult(message)
break
case 'mt_pong':
this._onPong(message)
break
case 'mt_bad_server_salt':
this._onBadServerSalt(message)
break
case 'mt_bad_msg_notification':
this._onBadMsgNotification(messageId, message)
break
case 'mt_msgs_ack':
message.msgIds.forEach((msgId) => this._onMessageAcked(msgId))
break
case 'mt_new_session_created':
this._onNewSessionCreated(message)
break
case 'mt_msgs_all_info':
this._onMessagesInfo(message.msgIds, message.info)
break
case 'mt_msg_detailed_info':
this._onMessageInfo(
message.msgId,
message.status,
message.answerMsgId
)
break
case 'mt_msg_new_detailed_info':
this._onMessageInfo(Long.ZERO, 0, message.answerMsgId)
break
case 'mt_msgs_state_info':
this._onMsgsStateInfo(message)
break
case 'mt_future_salts':
// todo
break
case 'mt_msgs_state_req':
case 'mt_msg_resend_req':
// tdlib doesnt handle them, so why should we? :upside_down_face:
this.log.warn(
'received %s (msg_id = %l): %j',
message._,
messageId,
message
)
break
default:
if (tl.isAnyUpdates(message)) {
if (this._usable && this.params.inactivityTimeout)
this._rescheduleInactivity()
this.emit('update', message)
return
}
this.log.warn('unknown message received: %j', message)
}
}
private _onRpcResult({ result, reqMsgId }: mtp.RawMt_rpc_result): void {
if (this._usable && this.params.inactivityTimeout)
this._rescheduleInactivity()
if (reqMsgId.isZero()) {
this.log.warn(
'received rpc_result with %s with req_msg_id = 0',
result._
)
return
}
const msg = this._pendingMessages.get(reqMsgId)
if (!msg) {
// check if the msg is one of the recent ones
if (this._recentOutgoingMsgIds.has(reqMsgId)) {
this.log.debug(
'received rpc_result again for %l (contains %s)',
reqMsgId,
result._
)
} else {
this.log.warn(
'received rpc_result for unknown message %l: %j',
reqMsgId,
result
)
}
return
}
if (msg._ !== 'rpc') {
this.log.error(
'received rpc_result for %s request %l',
msg._,
reqMsgId
)
return
}
const rpc = msg.rpc
// initConnection call was definitely received and
// processed by the server, so we no longer need to use it
if (rpc.initConn) this._initConnectionCalled = true
this.log.verbose('<<< (%s) %j', rpc.method, result)
if (result._ === 'mt_rpc_error') {
const res = result as mtp.RawMt_rpc_error
this.log.debug(
'received rpc_error [%d:%s] for %l (%s)',
res.errorCode,
res.errorMessage,
reqMsgId,
rpc.method
)
if (rpc.cancelled) return
const error = tl.errors.createRpcErrorFromTl(res)
if (this.params.niceStacks !== false) {
makeNiceStack(error, rpc.stack!, rpc.method)
}
rpc.promise.reject(error)
} else {
this.log.debug(
'received rpc_result (%s) for request %l (%s)',
result._,
reqMsgId,
rpc.method
)
if (rpc.cancelled) return
rpc.promise.resolve(result)
}
this._onMessageAcked(reqMsgId)
this._pendingMessages.delete(reqMsgId)
}
private _onMessageAcked(msgId: Long, inContainer = false): void {
const msg = this._pendingMessages.get(msgId)
if (!msg) {
this.log.warn('received ack for unknown message %l', msgId)
return
}
switch (msg._) {
case 'container':
this.log.debug(
'received ack for container %l (size = %d)',
msgId,
msg.msgIds.length
)
msg.msgIds.forEach((msgId) => this._onMessageAcked(msgId, true))
// we no longer need info about the container
this._pendingMessages.delete(msgId)
break
case 'rpc': {
const rpc = msg.rpc
this.log.debug(
'received ack for rpc query %l (%s, acked before = %s)',
msgId,
rpc.method,
rpc.acked
)
rpc.acked = true
if (
!inContainer &&
rpc.containerId &&
this._pendingMessages.has(rpc.containerId)
) {
// ack all the messages in that container
this._onMessageAcked(rpc.containerId)
}
// this message could also already be in some queue,
removeFromLongArray(this._queuedStateReq, msgId)
removeFromLongArray(this._queuedResendReq, msgId)
// if resend/state was already requested, it will simply be ignored
this._getStateSchedule.remove(rpc)
break
}
default:
if (!inContainer) {
this.log.warn(
'received unexpected ack for %s query %l',
msg._,
msgId
)
}
}
}
private _onMessageFailed(
msgId: Long,
reason: string,
inContainer = false
): void {
const msgInfo = this._pendingMessages.get(msgId)
if (!msgInfo) {
this.log.debug(
'unknown message %l failed because of %s',
msgId,
reason
)
return
}
switch (msgInfo._) {
case 'container':
this.log.debug(
'container %l (size = %d) failed because of %s',
msgId,
msgInfo.msgIds.length,
reason
)
msgInfo.msgIds.forEach((msgId) =>
this._onMessageFailed(msgId, reason, true)
)
break
case 'ping':
this.log.debug(
'ping (msg_id = %l) failed because of %s',
msgId,
reason
)
// restart ping
this._resetLastPing(true)
break
case 'rpc': {
const rpc = msgInfo.rpc
this.log.debug(
'rpc query %l (%s) failed because of %s',
msgId,
rpc.method,
reason
)
// since the query was rejected, we can let it reassign msg_id to avoid containers
this._pendingMessages.delete(msgId)
rpc.msgId = undefined
this._enqueueRpc(rpc, true)
if (
!inContainer &&
rpc.containerId &&
this._pendingMessages.has(rpc.containerId)
) {
// fail all the messages in that container
this._onMessageFailed(rpc.containerId, reason)
}
// this message could also already be in some queue,
removeFromLongArray(this._queuedStateReq, msgId)
removeFromLongArray(this._queuedResendReq, msgId)
// if resend/state was already requested, it will simply be ignored
this._getStateSchedule.remove(rpc)
break
}
case 'resend':
this.log.debug(
'resend request %l (size = %d) failed because of %s',
msgId,
msgInfo.msgIds.length,
reason
)
this._queuedResendReq.splice(0, 0, ...msgInfo.msgIds)
this._flushTimer.emitWhenIdle()
break
case 'state':
this.log.debug(
'state request %l (size = %d) failed because of %s',
msgId,
msgInfo.msgIds.length,
reason
)
this._queuedStateReq.splice(0, 0, ...msgInfo.msgIds)
this._flushTimer.emitWhenIdle()
break
}
this._pendingMessages.delete(msgId)
}
private _resetLastPing(withTime = false): void {
if (withTime) this._lastPingTime = 0
if (!this._lastPingMsgId.isZero()) {
this._pendingMessages.delete(this._lastPingMsgId)
}
this._lastPingMsgId = Long.ZERO
}
private _registerOutgoingMsgId(msgId: Long): Long {
this._recentOutgoingMsgIds.add(msgId)
return msgId
}
private _onPong({ msgId, pingId }: mtp.RawMt_pong): void {
const info = this._pendingMessages.get(msgId)
if (!info) {
this.log.warn(
'received pong to unknown ping (msg_id %l, ping_id %l)',
msgId,
pingId
)
return
}
if (info._ !== 'ping') {
this.log.warn(
'received pong to %s query, not ping (msg_id %l, ping_id %l)',
info._,
msgId,
pingId
)
return
}
if (info.pingId.neq(pingId)) {
this.log.warn(
'received pong to %l, but expected ping_id = %l (got %l)',
msgId,
info.pingId,
pingId
)
}
const rtt = Date.now() - this._lastPingTime
this._lastPingRtt = rtt
if (info.containerId.neq(msgId)) {
this._onMessageAcked(info.containerId)
}
this.log.debug(
'received pong: msg_id %l, ping_id %l, rtt = %dms',
msgId,
pingId,
rtt
)
this._resetLastPing()
}
private _onBadServerSalt(msg: mtp.RawMt_bad_server_salt): void {
this._session.serverSalt = msg.newServerSalt
this._onMessageFailed(msg.badMsgId, 'bad_server_salt')
}
private _onBadMsgNotification(
msgId: Long,
msg: mtp.RawMt_bad_msg_notification
): void {
switch (msg.errorCode) {
case 16:
case 17:
case 20: {
if (msg.errorCode !== 20) {
// msg_id is either too high or too low
// code 20 means msg_id is too old,
// we just need to resend the message
const serverTime = msgId.low >>> 0
const timeOffset =
Math.floor(Date.now() / 1000) - serverTime
this._session._timeOffset = timeOffset
this.log.debug(
'server time: %d, corrected offset to %d',
serverTime,
timeOffset
)
}
this._onMessageFailed(
msg.badMsgId,
`bad_msg_notification ${msg.errorCode}`
)
break
}
default:
// something went very wrong, we need to reset the session
this.log.error(
'received bad_msg_notification for msg_id = %l, code = %d. session will be reset'
)
this._resetSession()
break
}
}
private _onNewSessionCreated({
firstMsgId,
serverSalt,
uniqueId,
}: mtp.RawMt_new_session_created): void {
if (uniqueId.eq(this._lastSessionCreatedUid)) {
this.log.debug(
'received new_session_created with the same uid = %l, ignoring',
uniqueId
)
return
}
if (
!this._lastSessionCreatedUid.isZero() &&
!this.params.disableUpdates
) {
// force the client to fetch missed updates
// when _lastSessionCreatedUid == 0, the connection has
// just been established, and the client will fetch them anyways
this.emit('update', { _: 'updatesTooLong' })
}
this._session.serverSalt = serverSalt
this.log.debug(
'received new_session_created, uid = %l, first msg_id = %l',
uniqueId,
firstMsgId
)
for (const msgId of this._pendingMessages.keys()) {
const val = this._pendingMessages.get(msgId)!
if (val._ === 'container') {
if (msgId.lt(firstMsgId)) {
// all messages in this container will be resent
// info about this container is no longer needed
this._pendingMessages.delete(msgId)
}
return
}
const containerId =
val._ === 'rpc' ? val.rpc.containerId || msgId : val.containerId
if (containerId.lt(firstMsgId)) {
this._onMessageFailed(msgId, 'new_session_created', true)
}
}
}
private _onMessageInfo(
msgId: Long,
status: number,
answerMsgId: Long
): void {
if (!msgId.isZero()) {
const info = this._pendingMessages.get(msgId)
if (!info) {
this.log.info(
'received message info about unknown message %l',
msgId
)
return
}
switch (status & 7) {
case 1:
case 2:
case 3:
// message wasn't received by the server
this._onMessageFailed(msgId, `message info state ${status}`)
break
case 0:
if (!answerMsgId.isZero()) {
this.log.warn(
'received message info with status = 0: msg_id = %l, status = %d, ans_id = %l',
msgId,
status,
answerMsgId
)
return this._onMessageFailed(
msgId,
`message info state = 0, ans_id = 0`
)
}
// fallthrough
case 4:
this._onMessageAcked(msgId)
break
}
}
if (
!answerMsgId.isZero() &&
!this._recentIncomingMsgIds.has(answerMsgId)
) {
this.log.debug(
'received message info for %l, but answer (%l) was not received yet',
msgId,
answerMsgId
)
this._queuedResendReq.push(answerMsgId)
this._flushTimer.emitWhenIdle()
return
}
this.log.debug(
'received message info for %l, and answer (%l) was already received',
msgId,
answerMsgId
)
}
private _onMessagesInfo(msgIds: Long[], info: Buffer): void {
if (msgIds.length !== info.length) {
this.log.warn(
'messages state info was invalid: msg_ids.length !== info.length'
)
}
for (let i = 0; i < msgIds.length; i++) {
this._onMessageInfo(msgIds[i], info[i], Long.ZERO)
}
}
private _onMsgsStateInfo(msg: mtp.RawMt_msgs_state_info): void {
const info = this._pendingMessages.get(msg.reqMsgId)
if (!info) {
this.log.warn(
'received msgs_state_info to unknown request %l',
msg.reqMsgId
)
return
}
if (info._ !== 'state') {
this.log.warn(
'received msgs_state_info to %s query %l',
info._,
msg.reqMsgId
)
return
}
this._onMessagesInfo(info.msgIds, msg.info)
}
private _enqueueRpc(rpc: PendingRpc, force?: boolean) {
// already queued or cancelled
if ((!force && !rpc.sent) || rpc.cancelled) return
rpc.sent = false
rpc.containerId = undefined
this.log.debug(
'enqueued %s for sending (msg_id = %s)',
rpc.method,
rpc.msgId || 'n/a'
)
this._queuedRpc.pushBack(rpc)
this._flushTimer.emitWhenIdle()
}
_resetSession(): void {
this._queuedDestroySession.push(this._session._sessionId)
this.reconnect()
this._session.changeSessionId()
this.log.debug('session reset, new sid = %l', this._session._sessionId)
// once we receive new_session_created, all pending messages will be resent.
// clear getState/resend queues because they are not needed anymore
this._queuedStateReq.length = 0
this._queuedResendReq.length = 0
this._flushTimer.reset()
}
private _sendAck(msgId: Long): void {
if (this._queuedAcks.length === 0) {
this._flushTimer.emitBeforeNext(30000)
}
this._queuedAcks.push(msgId)
if (this._queuedAcks.length >= 100) {
this._flushTimer.emitNow()
}
}
sendRpc<T extends tl.RpcMethod>(
request: T,
stack?: string,
timeout?: number
): Promise<tl.RpcCallReturn[T['_']]> {
if (this._usable && this.params.inactivityTimeout)
this._rescheduleInactivity()
if (!stack && this.params.niceStacks !== false) {
stack = new Error().stack
}
const method = request._
let obj: tl.TlObject = request
let initConn = false
if (this.params.disableUpdates) {
obj = {
_: 'invokeWithoutUpdates',
query: obj,
}
}
if (!this._initConnectionCalled) {
// we will wrap every rpc call with initConnection
// until some of the requests wrapped with it is
// either acked or returns rpc_result
this.log.debug(
'wrapping %s with initConnection, layer: %d',
method,
this.params.layer
)
obj = {
_: 'invokeWithLayer',
layer: this.params.layer,
query: {
...this.params.initConnection,
query: obj,
},
}
initConn = true
}
this.log.verbose('>>> %j', obj)
let content = TlBinaryWriter.serializeObject(this._writerMap,obj)
if (content.length > 1044404) {
// if you send larger payloads, telegram will just close connection,
// and since we resend them, it will get resent after reconnection and
// that will be an endless loop of reconnections. we don't want that,
// and payloads this large are usually a sign of an error in the code.
throw new Error(`Payload is too big (${content.length} > 1044404)`)
}
// gzip
let shouldGzip = content.length > 128
if (content.length > 16384) {
// test compression ratio for the middle part
// if it is less than 0.9, then try to compress the whole request
const middle = ~~((content.length - 1024) / 2)
const gzipped = gzipDeflate(
content.slice(middle, middle + 1024),
0.9
)
if (!gzipped) shouldGzip = false
}
if (shouldGzip) {
const gzipped = gzipDeflate(content, 0.9)
if (gzipped) {
this.log.debug(
'gzipped %s (%db -> %db)',
method,
content.length,
gzipped.length
)
content = gzipped
} else {
shouldGzip = false
}
}
const pending: PendingRpc = {
method,
promise: undefined as any, // because we need the object to make a promise
data: content,
stack,
// we will need to know size of gzip_packed overhead in _flush()
gzipOverhead: shouldGzip
? 4 + TlSerializationCounter.countBytesOverhead(content.length)
: 0,
initConn,
// setting them as well so jit can optimize stuff
sent: undefined,
getState: undefined,
msgId: undefined,
seqNo: undefined,
containerId: undefined,
acked: undefined,
cancelled: undefined,
timeout: undefined,
}
const promise = createCancellablePromise(
this._cancelRpc.bind(this, pending)
)
pending.promise = promise
if (timeout) {
pending.timeout = setTimeout(
this._cancelRpc,
timeout,
pending,
true
)
}
this._enqueueRpc(pending, true)
return promise
}
private _cancelRpc(rpc: PendingRpc, onTimeout = false): void {
if (rpc.cancelled && !onTimeout) {
throw new Error('RPC was already cancelled')
}
if (!onTimeout && rpc.timeout) {
clearTimeout(rpc.timeout)
}
if (onTimeout) {
const error = new tl.errors.RpcTimeoutError()
if (this.params.niceStacks !== false) {
makeNiceStack(error, rpc.stack!, rpc.method)
}
rpc.promise.reject(error)
}
rpc.cancelled = true
if (rpc.msgId) {
this._queuedCancelReq.push(rpc.msgId)
this._flushTimer.emitWhenIdle()
} else {
// in case rpc wasn't sent yet (or had some error),
// we can simply remove it from queue
this._queuedRpc.remove(rpc)
}
}
private _flush(): void {
if (!this._session.authorized || this._current429Timeout) {
// it will be flushed once connection is usable
return
}
try {
this._doFlush()
} catch (e: any) {
this.log.error('flush error: %s', e.stack)
// should not happen unless there's a bug in the code
}
// schedule next flush
// if there are more queued requests, flush immediately
// (they likely didn't fit into one message)
if (
this._queuedRpc.length ||
this._queuedAcks.length ||
this._queuedStateReq.length ||
this._queuedResendReq.length
) {
this._flush()
} else {
this._flushTimer.emitBefore(this._lastPingTime + 60000)
}
}
private _doFlush(): void {
this.log.debug(
'flushing send queue. queued rpc: %d',
this._queuedRpc.length
)
// oh bloody hell mate
// total size & count
let packetSize = 0
let messageCount = 0
// size & msg count that count towards container limit
// these will be added to total later
let containerMessageCount = 0
let containerSize = 0
let ackRequest: Buffer | null = null
let ackMsgIds: Long[] | null = null
let pingRequest: Buffer | null = null
let pingId: Long | null = null
let pingMsgId: Long | null = null
let getStateRequest: Buffer | null = null
let getStateMsgId: Long | null = null
let getStateMsgIds: Long[] | null = null
let resendRequest: Buffer | null = null
let resendMsgId: Long | null = null
let resendMsgIds: Long[] | null = null
let cancelRpcs: Long[] | null = null
let destroySessions: Long[] | null = null
const now = Date.now()
if (this._queuedAcks.length) {
let acks = this._queuedAcks
if (acks.length > 8192) {
this._queuedAcks = acks.slice(8192)
acks = acks.slice(0, 8192)
} else {
this._queuedAcks = []
}
const obj: mtp.RawMt_msgs_ack = {
_: 'mt_msgs_ack',
msgIds: acks,
}
ackMsgIds = obj.msgIds
ackRequest = TlBinaryWriter.serializeObject(this._writerMap, obj)
packetSize += ackRequest.length + 16
messageCount += 1
}
const getStateTime = now + 1500
if (now - this._lastPingTime > 60000) {
if (!this._lastPingMsgId.isZero()) {
this.log.warn(
"didn't receive pong for previous ping (msg_id = %l)",
this._lastPingMsgId
)
this._pendingMessages.delete(this._lastPingMsgId)
}
pingId = randomLong()
const obj: mtp.RawMt_ping = {
_: 'mt_ping',
pingId,
}
this._lastPingTime = Date.now()
pingRequest = TlBinaryWriter.serializeObject(this._writerMap, obj)
containerSize += pingRequest.length + 16
containerMessageCount += 1
}
{
if (this._queuedStateReq.length) {
let ids = this._queuedStateReq
if (ids.length > 8192) {
this._queuedStateReq = ids.slice(8192)
ids = ids.slice(0, 8192)
} else {
this._queuedStateReq = []
}
getStateMsgIds = ids
}
const idx = this._getStateSchedule.index(
{ getState: now } as any,
true
)
if (idx > 0) {
const toGetState = this._getStateSchedule.raw.splice(0, idx)
if (!getStateMsgIds) getStateMsgIds = []
toGetState.forEach((it) => getStateMsgIds!.push(it.msgId!))
}
if (getStateMsgIds) {
const obj: mtp.RawMt_msgs_state_req = {
_: 'mt_msgs_state_req',
msgIds: getStateMsgIds,
}
getStateRequest = TlBinaryWriter.serializeObject(this._writerMap, obj)
packetSize += getStateRequest.length + 16
messageCount += 1
}
}
if (this._queuedResendReq.length) {
resendMsgIds = this._queuedResendReq
if (resendMsgIds.length > 8192) {
this._queuedResendReq = resendMsgIds.slice(8192)
resendMsgIds = resendMsgIds.slice(0, 8192)
} else {
this._queuedResendReq = []
}
const obj: mtp.RawMt_msg_resend_req = {
_: 'mt_msg_resend_req',
msgIds: resendMsgIds,
}
resendRequest = TlBinaryWriter.serializeObject(this._writerMap, obj)
packetSize += resendRequest.length + 16
messageCount += 1
}
if (this._queuedCancelReq.length) {
containerMessageCount += this._queuedCancelReq.length
containerSize += this._queuedCancelReq.length * 28
cancelRpcs = this._queuedCancelReq
this._queuedCancelReq = []
}
if (this._queuedDestroySession.length) {
containerMessageCount += this._queuedCancelReq.length
containerSize += this._queuedCancelReq.length * 28
destroySessions = this._queuedDestroySession
this._queuedDestroySession = []
}
let forceContainer = false
const rpcToSend: PendingRpc[] = []
while (
this._queuedRpc.length &&
containerSize < 32768 && // 2^15
containerMessageCount < 1020
) {
const msg = this._queuedRpc.popFront()!
if (msg.cancelled) continue
// note: we don't check for <2^15 here
// this is not documented, but large requests
// (like upload.saveFilePart) *may* exceed that limit
rpcToSend.push(msg)
containerSize += msg.data.length + 16
if (msg.gzipOverhead) containerSize += msg.gzipOverhead
// if message was already assigned a msg_id,
// we must wrap it in a container with a newer msg_id
if (msg.msgId) forceContainer = true
}
packetSize += containerSize
messageCount += containerMessageCount + rpcToSend.length
if (!messageCount) {
this.log.debug('flush failed: nothing to flush')
return
}
const useContainer = forceContainer || messageCount > 1
if (useContainer) packetSize += 24 // 8 (msg_container) + 16 (mtproto header)
const writer = TlBinaryWriter.alloc(this._writerMap, packetSize)
if (useContainer) {
// leave bytes for mtproto header (we'll write it later,
// since we need seqno and msg_id to be larger than the content)
writer.pos += 16
writer.uint(0x73f1f8dc) // msg_container
writer.uint(messageCount)
}
const otherPendings: Exclude<
PendingMessage,
{ _: 'rpc' | 'container' }
>[] = []
if (ackRequest)
this._registerOutgoingMsgId(
this._session.writeMessage(writer, ackRequest)
)
if (pingRequest) {
pingMsgId = this._registerOutgoingMsgId(
this._session.writeMessage(writer, pingRequest)
)
this._lastPingMsgId = pingMsgId
const pingPending: PendingMessage = {
_: 'ping',
pingId: pingId!,
containerId: pingMsgId,
}
this._pendingMessages.set(pingMsgId, pingPending)
otherPendings.push(pingPending)
}
if (getStateRequest) {
getStateMsgId = this._registerOutgoingMsgId(
this._session.writeMessage(writer, getStateRequest)
)
const getStatePending: PendingMessage = {
_: 'state',
msgIds: getStateMsgIds!,
containerId: getStateMsgId,
}
this._pendingMessages.set(getStateMsgId, getStatePending)
otherPendings.push(getStatePending)
}
if (resendRequest) {
resendMsgId = this._registerOutgoingMsgId(
this._session.writeMessage(writer, resendRequest)
)
const resendPending: PendingMessage = {
_: 'resend',
msgIds: resendMsgIds!,
containerId: resendMsgId,
}
this._pendingMessages.set(resendMsgId, resendPending)
otherPendings.push(resendPending)
}
if (cancelRpcs) {
cancelRpcs.forEach((msgId) => {
const cancelMsgId = this._registerOutgoingMsgId(
this._session.writeMessage(writer, {
_: 'mt_rpc_drop_answer',
reqMsgId: msgId,
})
)
const pending: PendingMessage = {
_: 'cancel',
msgId,
containerId: cancelMsgId,
}
this._pendingMessages.set(cancelMsgId, pending)
otherPendings.push(pending)
})
}
if (destroySessions) {
destroySessions.forEach((sessionId) => {
const msgId = this._registerOutgoingMsgId(
this._session.writeMessage(writer, {
_: 'mt_destroy_session',
sessionId,
})
)
const pending: PendingMessage = {
_: 'destroy_session',
sessionId,
containerId: msgId,
}
this._pendingMessages.set(msgId, pending)
otherPendings.push(pending)
})
}
for (let i = 0; i < rpcToSend.length; i++) {
const msg = rpcToSend[i]
// not using writeMessage here because we also need seqNo, and
// i dont want to also return seqNo there because that would mean
// extra object overhead
if (!msg.msgId) {
const msgId = this._session.getMessageId()
const seqNo = this._session.getSeqNo()
this.log.debug(
'%s: msg_id assigned %l, seqno: %d',
msg.method,
msgId,
seqNo
)
msg.msgId = msgId
msg.seqNo = seqNo
this._pendingMessages.set(msgId, {
_: 'rpc',
rpc: msg,
})
} else {
this.log.debug(
'%s: msg_id already assigned, reusing %l, seqno: %d',
msg.method,
msg.msgId,
msg.seqNo
)
}
// (re-)schedule get_state if needed
if (msg.getState) {
this._getStateSchedule.remove(msg)
}
if (!msg.acked) {
msg.getState = getStateTime
this._getStateSchedule.insert(msg)
}
writer.long(this._registerOutgoingMsgId(msg.msgId))
writer.uint(msg.seqNo!)
if (msg.gzipOverhead) {
writer.uint(msg.data.length + msg.gzipOverhead)
writer.uint(0x3072cfa1) // gzip_packed#3072cfa1
writer.bytes(msg.data)
} else {
writer.uint(msg.data.length)
writer.raw(msg.data)
}
msg.sent = true
}
if (useContainer) {
// we now need to assign the container msg_id and seqno
// we couldn't have assigned them earlier because mtproto
// requires them to be >= than the contained messages
// writer.pos is expected to be packetSize
const containerId = this._session.getMessageId()
writer.pos = 0
writer.long(this._registerOutgoingMsgId(containerId))
writer.uint(this._session.getSeqNo(false))
writer.uint(packetSize - 16)
writer.pos = packetSize
const msgIds = []
for (let i = 0; i < rpcToSend.length; i++) {
const msg = rpcToSend[i]
msg.containerId = containerId
msgIds.push(msg.msgId!)
}
if (otherPendings.length) {
otherPendings.forEach((msg) => {
msgIds.push(msg.containerId)
msg.containerId = containerId
})
}
this._pendingMessages.set(containerId, { _: 'container', msgIds })
}
const result = writer.result()
// probably the easiest way lol
const rootMsgId = new Long(result.readInt32LE(), result.readInt32LE(4))
this.log.debug(
'sending %d messages: size = %db, acks = %d (msg_id = %s), ping = %s (msg_id = %s), state_req = %s (msg_id = %s), resend = %s (msg_id = %s), rpc = %s, container = %s, root msg_id = %l',
messageCount,
packetSize,
ackMsgIds?.length || 'false',
ackMsgIds?.map((it) => it.toString()),
!!pingRequest,
pingMsgId,
getStateMsgIds?.map((it) => it.toString()) || 'false',
getStateMsgId,
resendMsgIds?.map((it) => it.toString()) || 'false',
resendMsgId,
rpcToSend.map((it) => it.method),
useContainer,
rootMsgId
)
this._session
.encryptMessage(result)
.then((enc) => this.send(enc))
.catch((err) => {
this.log.error(
'error while sending pending messages (root msg_id = %l): %s',
rootMsgId,
err.stack
)
// put acks in the front so they are the first to be sent
if (ackMsgIds) this._queuedAcks.splice(0, 0, ...ackMsgIds)
this._onMessageFailed(rootMsgId, 'unknown error')
})
}
}