/* eslint-disable max-depth,max-params */ import { tl } from '@mtcute/tl' import { MtArgumentError } from '../../types/errors.js' import { assertNever, MaybeAsync } from '../../types/utils.js' import { AsyncLock, ConditionVariable, Deque, EarlyTimer, getBarePeerId, getMarkedPeerId, parseMarkedPeerId, SortedLinkedList, toggleChannelIdMark, toInputChannel, } from '../../utils/index.js' import { BaseTelegramClient } from '../base.js' import { CurrentUserInfo } from '../storage/service/current-user.js' import { PeersIndex } from '../types/peers/peers-index.js' import { PendingUpdate, PendingUpdateContainer, RawUpdateHandler, UpdatesManagerParams } from './types.js' import { createDummyUpdatesContainer, extractChannelIdFromUpdate, isMessageEmpty, messageToUpdate, toPendingUpdate, } from './utils.js' // code in this file is very bad, thanks to Telegram's awesome updates mechanism // todo: maybe move to another class? // /** // * Enable RPS meter. // * // * > **Note**: This may have negative impact on performance // * // * @param size Sampling size // * @param time Window time // */ // export function enableRps(client: ITelegramClient, size?: number, time?: number): void { // const state = getState(client) // state.rpsIncoming = new RpsMeter(size, time) // state.rpsProcessing = new RpsMeter(size, time) // } // /** // * Get current average incoming RPS // * // * Incoming RPS is calculated based on // * incoming update containers. Normally, // * they should be around the same, except // * rare situations when processing rps // * may peak. // */ // export function getCurrentRpsIncoming(client: ITelegramClient): number { // const state = getState(client) // if (!state.rpsIncoming) { // throw new MtArgumentError('RPS meter is not enabled, use .enableRps() first') // } // return state.rpsIncoming.getRps() // } // /** // * Get current average processing RPS // * // * Processing RPS is calculated based on // * dispatched updates. Normally, // * they should be around the same, except // * rare situations when processing rps // * may peak. // */ // export function getCurrentRpsProcessing(client: ITelegramClient): number { // const state = getState(client) // if (!state.rpsProcessing) { // throw new MtArgumentError('RPS meter is not enabled, use .enableRps() first') // } // return state.rpsProcessing.getRps() // } const KEEP_ALIVE_INTERVAL = 15 * 60 * 1000 // 15 minutes // todo: fix docs export class UpdatesManager { updatesLoopActive = false updatesLoopCv = new ConditionVariable() postponedTimer = new EarlyTimer() hasTimedoutPostponed = false pendingUpdateContainers = new SortedLinkedList((a, b) => a.seqStart - b.seqStart) pendingPtsUpdates = new SortedLinkedList((a, b) => a.ptsBefore! - b.ptsBefore!) pendingPtsUpdatesPostponed = new SortedLinkedList((a, b) => a.ptsBefore! - b.ptsBefore!) pendingQtsUpdates = new SortedLinkedList((a, b) => a.qtsBefore! - b.qtsBefore!) pendingQtsUpdatesPostponed = new SortedLinkedList((a, b) => a.qtsBefore! - b.qtsBefore!) pendingUnorderedUpdates = new Deque() noDispatchEnabled = !this.params.disableNoDispatch // channel id or 0 => msg id noDispatchMsg = new Map>() // channel id or 0 => pts noDispatchPts = new Map>() noDispatchQts = new Set() lock = new AsyncLock() // rpsIncoming?: RpsMeter // rpsProcessing?: RpsMeter // accessing storage every time might be expensive, // so store everything here, and load & save // every time session is loaded & saved. pts?: number qts?: number date?: number seq?: number // old values of the updates state (i.e. as in DB) // used to avoid redundant storage calls oldPts?: number oldQts?: number oldDate?: number oldSeq?: number selfChanged = false // todo what is this? // whether to catch up channels from the locally stored pts catchUpChannels = false catchUpOnStart = this.params.catchUp ?? false cpts = new Map() cptsMod = new Map() channelDiffTimeouts = new Map() channelsOpened = new Map() log = this.client.log.create('updates') private _handler: RawUpdateHandler = () => {} auth?: CurrentUserInfo | null // todo: do we need a local copy? keepAliveInterval?: NodeJS.Timeout constructor( readonly client: BaseTelegramClient, readonly params: UpdatesManagerParams = {}, ) { if (client.params.disableUpdates) { throw new MtArgumentError('Updates must be enabled to use updates manager') } this._onKeepAlive = this._onKeepAlive.bind(this) this.postponedTimer.onTimeout(() => { this.hasTimedoutPostponed = true this.updatesLoopCv.notify() }) } setHandler(handler: RawUpdateHandler): void { this._handler = handler } destroy() { this.stopLoop() } notifyLoggedIn(self: CurrentUserInfo): void { this.auth = self this._fetchUpdatesState() .then(() => this.startLoop()) .catch((err) => this.client.emitError(err)) } notifyLoggedOut(): void { this.stopLoop() this.cpts.clear() this.cptsMod.clear() this.pts = this.qts = this.date = this.seq = undefined } async prepare(): Promise { await this._loadUpdatesStorage() } private _onKeepAlive() { this.log.debug('no updates for >15 minutes, catching up') this.handleUpdate({ _: 'updatesTooLong' }) } /** * Start updates loop. * * You must first call {@link enableUpdatesProcessing} to use this method. * * It is recommended to use this method in callback to {@link start}, * or otherwise make sure the user is logged in. * * > **Note**: If you are using {@link UpdatesManagerParams.catchUp} option, * > catching up will be done in background, you can't await it. */ async startLoop(): Promise { if (this.updatesLoopActive) return // otherwise we will catch up on the first update if (!this.catchUpOnStart) { await this._fetchUpdatesState() } // start updates loop in background this.updatesLoopActive = true this.keepAliveInterval = setInterval(this._onKeepAlive, KEEP_ALIVE_INTERVAL) this._loop().catch((err) => this.client.emitError(err)) if (this.catchUpOnStart) { this.catchUp() } } /** * **ADVANCED** * * Manually stop updates loop. * Usually done automatically when stopping the client with {@link close} */ stopLoop(): void { if (!this.updatesLoopActive) return clearInterval(this.keepAliveInterval) for (const timer of this.channelDiffTimeouts.values()) { clearTimeout(timer) } this.channelDiffTimeouts.clear() this.updatesLoopActive = false this.pendingUpdateContainers.clear() this.pendingUnorderedUpdates.clear() this.pendingPtsUpdates.clear() this.pendingQtsUpdates.clear() this.pendingPtsUpdatesPostponed.clear() this.pendingQtsUpdatesPostponed.clear() this.postponedTimer.reset() this.updatesLoopCv.notify() } /** * Catch up with the server by loading missed updates. * * > **Note**: In case the storage was not properly * > closed the last time, "catching up" might * > result in duplicate updates. */ catchUp(): void { this.log.debug('catch up requested') this.catchUpChannels = true this.handleUpdate({ _: 'updatesTooLong' }) } handleClientUpdate(update: tl.TypeUpdates, noDispatch = true): void { if (noDispatch && this.noDispatchEnabled) { this._addToNoDispatchIndex(update) } this.handleUpdate(update) } handleUpdate(update: tl.TypeUpdates): void { this.log.debug( 'received %s, queueing for processing. containers queue size: %d', update._, this.pendingUpdateContainers.length, ) // this.rpsIncoming?.hit() switch (update._) { case 'updatesTooLong': case 'updateShortMessage': case 'updateShortChatMessage': case 'updateShort': case 'updateShortSentMessage': this.pendingUpdateContainers.add({ upd: update, seqStart: 0, seqEnd: 0, }) break case 'updates': case 'updatesCombined': this.pendingUpdateContainers.add({ upd: update, seqStart: update._ === 'updatesCombined' ? update.seqStart : update.seq, seqEnd: update.seq, }) break default: assertNever(update) } this.updatesLoopCv.notify() } /** * **ADVANCED** * * Notify the updates manager that some channel was "opened". * Channel difference for "opened" channels will be fetched on a regular basis. * This is a low-level method, prefer using {@link openChat} instead. * * Channel must be resolve-able with `resolvePeer` method (i.e. be in cache); * base chat PTS must either be passed (e.g. from {@link Dialog}), or cached in storage. * * @param channelId Bare ID of the channel * @param pts PTS of the channel, if known (e.g. from {@link Dialog}) * @returns `true` if the channel was opened for the first time, `false` if it is already opened */ notifyChannelOpened(channelId: number, pts?: number): boolean { // this method is intentionally very dumb to avoid making this file even more unreadable if (this.channelsOpened.has(channelId)) { this.log.debug('channel %d opened again', channelId) this.channelsOpened.set(channelId, this.channelsOpened.get(channelId)! + 1) return false } this.channelsOpened.set(channelId, 1) this.log.debug('channel %d opened (pts=%d)', channelId, pts) // force fetch channel difference this._fetchChannelDifferenceViaUpdate(channelId, pts) return true } /** * **ADVANCED** * * Notify the updates manager that some channel was "closed". * Basically the opposite of {@link notifyChannelOpened}. * This is a low-level method, prefer using {@link closeChat} instead. * * @param channelId Bare channel ID * @returns `true` if the chat was closed for the last time, `false` otherwise */ notifyChannelClosed(channelId: number): boolean { const opened = this.channelsOpened.get(channelId)! if (opened === undefined) { return false } if (opened > 1) { this.log.debug('channel %d closed, but is opened %d more times', channelId, opened - 1) this.channelsOpened.set(channelId, opened - 1) return false } this.channelsOpened.delete(channelId) this.log.debug('channel %d closed', channelId) return true } ////////////////////////////////////////////// IMPLEMENTATION ////////////////////////////////////////////// async _fetchUpdatesState(): Promise { const { client, lock, log } = this await lock.acquire() log.debug('fetching initial state') try { let fetchedState = await client.call({ _: 'updates.getState' }) log.debug( 'updates.getState returned state: pts=%d, qts=%d, date=%d, seq=%d', fetchedState.pts, fetchedState.qts, fetchedState.date, fetchedState.seq, ) // for some unknown fucking reason getState may return old qts // call getDifference to get actual values :shrug: const diff = await client.call({ _: 'updates.getDifference', pts: fetchedState.pts, qts: fetchedState.qts, date: fetchedState.date, }) switch (diff._) { case 'updates.differenceEmpty': break case 'updates.differenceTooLong': // shouldn't happen, but who knows? (fetchedState as tl.Mutable).pts = diff.pts break case 'updates.differenceSlice': fetchedState = diff.intermediateState break case 'updates.difference': fetchedState = diff.state break default: assertNever(diff) } this.qts = fetchedState.qts this.pts = fetchedState.pts this.date = fetchedState.date this.seq = fetchedState.seq log.debug('loaded initial state: pts=%d, qts=%d, date=%d, seq=%d', this.pts, this.qts, this.date, this.seq) } catch (e) { if (this.client.isConnected) { log.error('failed to fetch updates state: %s', e) } lock.release() throw e } lock.release() } async _loadUpdatesStorage(): Promise { const storedState = await this.client.storage.updates.getState() if (storedState) { this.pts = this.oldPts = storedState[0] this.qts = this.oldQts = storedState[1] this.date = this.oldDate = storedState[2] this.seq = this.oldSeq = storedState[3] this.log.debug( 'loaded stored state: pts=%d, qts=%d, date=%d, seq=%d', storedState[0], storedState[1], storedState[2], storedState[3], ) } // if no state, don't bother initializing properties // since that means that there is no authorization, // and thus fetchUpdatesState will be called } async _saveUpdatesStorage(save = false): Promise { const { client } = this // todo: move this to updates state service // before any authorization pts will be undefined if (this.pts !== undefined) { // if old* value is not available, assume it has changed. if (this.oldPts === undefined || this.oldPts !== this.pts) { await client.storage.updates.setPts(this.pts) } if (this.oldQts === undefined || this.oldQts !== this.qts) { await client.storage.updates.setQts(this.qts!) } if (this.oldDate === undefined || this.oldDate !== this.date) { await client.storage.updates.setDate(this.date!) } if (this.oldSeq === undefined || this.oldSeq !== this.seq) { await client.storage.updates.setSeq(this.seq!) } // update old* values this.oldPts = this.pts this.oldQts = this.qts this.oldDate = this.date this.oldSeq = this.seq await client.storage.updates.setManyChannelPts(this.cptsMod) this.cptsMod.clear() if (save) { await client.mt.storage.save() } } } _addToNoDispatchIndex(updates?: tl.TypeUpdates): void { if (!updates) return const { noDispatchMsg, noDispatchPts, noDispatchQts } = this const addUpdate = (upd: tl.TypeUpdate) => { const channelId = extractChannelIdFromUpdate(upd) ?? 0 const pts = 'pts' in upd ? upd.pts : undefined if (pts) { const set = noDispatchPts.get(channelId) if (!set) noDispatchPts.set(channelId, new Set([pts])) else set.add(pts) } const qts = 'qts' in upd ? upd.qts : undefined if (qts) { noDispatchQts.add(qts) } switch (upd._) { case 'updateNewMessage': case 'updateNewChannelMessage': { const channelId = upd.message.peerId?._ === 'peerChannel' ? upd.message.peerId.channelId : 0 const set = noDispatchMsg.get(channelId) if (!set) noDispatchMsg.set(channelId, new Set([upd.message.id])) else set.add(upd.message.id) break } } } switch (updates._) { case 'updates': case 'updatesCombined': updates.updates.forEach(addUpdate) break case 'updateShortMessage': case 'updateShortChatMessage': case 'updateShortSentMessage': { // these updates are only used for non-channel messages, so we use 0 let set = noDispatchMsg.get(0) if (!set) noDispatchMsg.set(0, new Set([updates.id])) else set.add(updates.id) set = noDispatchPts.get(0) if (!set) noDispatchPts.set(0, new Set([updates.pts])) else set.add(updates.pts) break } case 'updateShort': addUpdate(updates.update) break case 'updatesTooLong': break default: assertNever(updates) } } async _fetchMissingPeers(upd: tl.TypeUpdate, peers: PeersIndex, allowMissing = false): Promise> { const { client } = this const missing = new Set() async function fetchPeer(peer?: tl.TypePeer | number) { if (!peer) return true const bare = typeof peer === 'number' ? parseMarkedPeerId(peer)[1] : getBarePeerId(peer) const marked = typeof peer === 'number' ? peer : getMarkedPeerId(peer) const index = marked > 0 ? peers.users : peers.chats if (index.has(bare)) return true if (missing.has(marked)) return false const cached = await client.storage.peers.getCompleteById(marked) if (!cached) { missing.add(marked) return allowMissing } // whatever, ts is not smart enough to understand (index as Map).set(bare, cached) return true } switch (upd._) { case 'updateNewMessage': case 'updateNewChannelMessage': case 'updateEditMessage': case 'updateEditChannelMessage': { const msg = upd.message if (msg._ === 'messageEmpty') return missing // ref: https://github.com/tdlib/td/blob/master/td/telegram/UpdatesManager.cpp // (search by UpdatesManager::is_acceptable_update) if (!(await fetchPeer(msg.peerId))) return missing if (!(await fetchPeer(msg.fromId))) return missing if (msg.replyTo) { if (msg.replyTo._ === 'messageReplyHeader' && !(await fetchPeer(msg.replyTo.replyToPeerId))) { return missing } if (msg.replyTo._ === 'messageReplyStoryHeader' && !(await fetchPeer(msg.replyTo.userId))) { return missing } } if (msg._ !== 'messageService') { if ( msg.fwdFrom && (!(await fetchPeer(msg.fwdFrom.fromId)) || !(await fetchPeer(msg.fwdFrom.savedFromPeer))) ) { return missing } if (!(await fetchPeer(msg.viaBotId))) return missing if (msg.entities) { for (const ent of msg.entities) { if (ent._ === 'messageEntityMentionName') { if (!(await fetchPeer(ent.userId))) return missing } } } if (msg.media) { switch (msg.media._) { case 'messageMediaContact': if (msg.media.userId && !(await fetchPeer(msg.media.userId))) { return missing } } } } else { switch (msg.action._) { case 'messageActionChatCreate': case 'messageActionChatAddUser': case 'messageActionInviteToGroupCall': for (const user of msg.action.users) { if (!(await fetchPeer(user))) return missing } break case 'messageActionChatJoinedByLink': if (!(await fetchPeer(msg.action.inviterId))) { return missing } break case 'messageActionChatDeleteUser': if (!(await fetchPeer(msg.action.userId))) return missing break case 'messageActionChatMigrateTo': if (!(await fetchPeer(toggleChannelIdMark(msg.action.channelId)))) { return missing } break case 'messageActionChannelMigrateFrom': if (!(await fetchPeer(-msg.action.chatId))) return missing break case 'messageActionGeoProximityReached': if (!(await fetchPeer(msg.action.fromId))) return missing if (!(await fetchPeer(msg.action.toId))) return missing break } } break } case 'updateDraftMessage': if ('entities' in upd.draft && upd.draft.entities) { for (const ent of upd.draft.entities) { if (ent._ === 'messageEntityMentionName') { if (!(await fetchPeer(ent.userId))) return missing } } } } return missing } async _storeMessageReferences(msg: tl.TypeMessage): Promise { if (msg._ === 'messageEmpty') return const { client } = this const peerId = msg.peerId if (peerId._ !== 'peerChannel') return const channelId = toggleChannelIdMark(peerId.channelId) const promises: MaybeAsync[] = [] function store(peer?: tl.TypePeer | number | number[]): void { if (!peer) return if (Array.isArray(peer)) { peer.forEach(store) return } const marked = typeof peer === 'number' ? peer : getMarkedPeerId(peer) promises.push(client.storage.refMsgs.store(marked, channelId, msg.id)) } // reference: https://github.com/tdlib/td/blob/master/td/telegram/MessagesManager.cpp // (search by get_message_user_ids, get_message_channel_ids) store(msg.fromId) if (msg._ === 'message') { store(msg.viaBotId) store(msg.fwdFrom?.fromId) if (msg.media) { switch (msg.media._) { case 'messageMediaWebPage': if (msg.media.webpage._ === 'webPage' && msg.media.webpage.attributes) { for (const attr of msg.media.webpage.attributes) { if (attr._ === 'webPageAttributeStory') { store(attr.peer) } } } break case 'messageMediaContact': store(msg.media.userId) break case 'messageMediaStory': store(msg.media.peer) break case 'messageMediaGiveaway': store(msg.media.channels.map(toggleChannelIdMark)) break } } } else { switch (msg.action._) { case 'messageActionChatCreate': case 'messageActionChatAddUser': case 'messageActionInviteToGroupCall': store(msg.action.users) break case 'messageActionChatDeleteUser': store(msg.action.userId) break } } if (msg.replyTo) { switch (msg.replyTo._) { case 'messageReplyHeader': store(msg.replyTo.replyToPeerId) store(msg.replyTo.replyFrom?.fromId) break case 'messageReplyStoryHeader': store(msg.replyTo.userId) break } // in fact, we can also use peers contained in the replied-to message, // but we don't fetch it automatically, so we can't know which peers are there } await Promise.all(promises) } async _fetchChannelDifference(channelId: number, fallbackPts?: number): Promise { const { channelDiffTimeouts, cpts, cptsMod, channelsOpened, client, log, pendingUnorderedUpdates } = this // clear timeout if any if (channelDiffTimeouts.has(channelId)) { clearTimeout(channelDiffTimeouts.get(channelId)) channelDiffTimeouts.delete(channelId) } let _pts: number | null | undefined = cpts.get(channelId) if (!_pts && this.catchUpChannels) { _pts = await client.storage.updates.getChannelPts(channelId) } if (!_pts) _pts = fallbackPts if (!_pts) { log.debug('fetchChannelDifference failed for channel %d: base pts not available', channelId) return false } const channelPeer = await client.storage.peers.getById(toggleChannelIdMark(channelId)) if (!channelPeer) { log.debug('fetchChannelDifference failed for channel %d: input peer not found', channelId) return false } const channel = toInputChannel(channelPeer) // to make TS happy let pts = _pts let limit = this.auth?.isBot ? 100000 : 100 if (pts <= 0) { pts = 1 limit = 1 } let lastTimeout = 0 for (;;) { const diff = await client.call({ _: 'updates.getChannelDifference', force: true, // Set to true to skip some possibly unneeded updates and reduce server-side load channel, pts, limit, filter: { _: 'channelMessagesFilterEmpty' }, }) if (diff.timeout) lastTimeout = diff.timeout if (diff._ === 'updates.channelDifferenceEmpty') { log.debug('getChannelDifference (cid = %d) returned channelDifferenceEmpty', channelId) break } const peers = PeersIndex.from(diff) if (diff._ === 'updates.channelDifferenceTooLong') { if (diff.dialog._ === 'dialog') { pts = diff.dialog.pts! } log.warn( 'getChannelDifference (cid = %d) returned channelDifferenceTooLong. new pts: %d, recent msgs: %d', channelId, pts, diff.messages.length, ) diff.messages.forEach((message) => { log.debug( 'processing message %d (%s) from TooLong diff for channel %d', message.id, message._, channelId, ) if (message._ === 'messageEmpty') return pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true)) }) break } log.debug( 'getChannelDifference (cid = %d) returned %d messages, %d updates. new pts: %d, final: %b', channelId, diff.newMessages.length, diff.otherUpdates.length, diff.pts, diff.final, ) diff.newMessages.forEach((message) => { log.debug('processing message %d (%s) from diff for channel %d', message.id, message._, channelId) if (message._ === 'messageEmpty') return pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true)) }) diff.otherUpdates.forEach((upd) => { const parsed = toPendingUpdate(upd, peers, true) log.debug( 'processing %s from diff for channel %d, pts_before: %d, pts: %d', upd._, channelId, parsed.ptsBefore, parsed.pts, ) if (isMessageEmpty(upd)) return pendingUnorderedUpdates.pushBack(parsed) }) pts = diff.pts if (diff.final) break } cpts.set(channelId, pts) cptsMod.set(channelId, pts) // schedule next fetch if (lastTimeout !== 0 && channelsOpened.has(channelId)) { log.debug('scheduling next fetch for channel %d in %d seconds', channelId, lastTimeout) channelDiffTimeouts.set( channelId, setTimeout(() => this._fetchChannelDifferenceViaUpdate(channelId), lastTimeout * 1000), ) } return true } _fetchChannelDifferenceLater( requestedDiff: Map>, channelId: number, fallbackPts?: number, ): void { if (!requestedDiff.has(channelId)) { requestedDiff.set( channelId, this._fetchChannelDifference(channelId, fallbackPts) .catch((err) => { this.log.warn('error fetching difference for %d: %s', channelId, err) }) .then((ok) => { requestedDiff.delete(channelId) if (!ok) { this.log.debug('channel difference for %d failed, falling back to common diff', channelId) this._fetchDifferenceLater(requestedDiff) } }), ) } } _fetchChannelDifferenceViaUpdate(channelId: number, pts?: number): void { this.handleUpdate( createDummyUpdatesContainer([ { _: 'updateChannelTooLong', channelId, pts, }, ]), ) } async _fetchDifference(requestedDiff: Map>): Promise { const { client, log, pendingPtsUpdates, pendingUnorderedUpdates } = this for (;;) { const diff = await client.call({ _: 'updates.getDifference', pts: this.pts!, date: this.date!, qts: this.qts!, }) switch (diff._) { case 'updates.differenceEmpty': log.debug('updates.getDifference returned updates.differenceEmpty') return case 'updates.differenceTooLong': this.pts = diff.pts log.debug('updates.getDifference returned updates.differenceTooLong') return } const fetchedState = diff._ === 'updates.difference' ? diff.state : diff.intermediateState log.debug( 'updates.getDifference returned %d messages, %d updates. new pts: %d, qts: %d, seq: %d, final: %b', diff.newMessages.length, diff.otherUpdates.length, fetchedState.pts, fetchedState.qts, fetchedState.seq, diff._ === 'updates.difference', ) const peers = PeersIndex.from(diff) diff.newMessages.forEach((message) => { log.debug('processing message %d in %j (%s) from common diff', message.id, message.peerId, message._) if (message._ === 'messageEmpty') return // pts does not need to be checked for them pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true)) }) diff.otherUpdates.forEach((upd) => { if (upd._ === 'updateChannelTooLong') { log.debug( 'received updateChannelTooLong for channel %d in common diff (pts = %d), fetching diff', upd.channelId, upd.pts, ) this._fetchChannelDifferenceLater(requestedDiff, upd.channelId, upd.pts) return } if (isMessageEmpty(upd)) return const parsed = toPendingUpdate(upd, peers, true) if (parsed.channelId && parsed.ptsBefore) { // we need to check pts for these updates, put into pts queue pendingPtsUpdates.add(parsed) } else { // the updates are in order already, we can treat them as unordered pendingUnorderedUpdates.pushBack(parsed) } log.debug( 'received %s from common diff, cid: %d, pts_before: %d, pts: %d, qts_before: %d', upd._, parsed.channelId, parsed.ptsBefore, parsed.pts, parsed.qtsBefore, ) }) this.pts = fetchedState.pts this.qts = fetchedState.qts this.seq = fetchedState.seq this.date = fetchedState.date if (diff._ === 'updates.difference') { return } } } _fetchDifferenceLater(requestedDiff: Map>): void { if (!requestedDiff.has(0)) { requestedDiff.set( 0, this._fetchDifference(requestedDiff) .catch((err) => { if (tl.RpcError.is(err, 'AUTH_KEY_UNREGISTERED')) { // for some reason, when logging out telegram may send updatesTooLong // in any case, we need to stop updates loop this.stopLoop() return } this.log.warn('error fetching common difference: %s', err) if (tl.RpcError.is(err, 'PERSISTENT_TIMESTAMP_INVALID')) { // this function never throws return this._fetchUpdatesState() } }) .then(() => { requestedDiff.delete(0) }), ) } } async _onUpdate( pending: PendingUpdate, requestedDiff: Map>, postponed = false, unordered = false, ): Promise { const { client, log } = this const upd = pending.update let missing: Set | undefined = undefined // it is important to do this before updating pts if (pending.peers.hasMin || pending.peers.empty) { // even if we have min peers in difference, we can't do anything about them. // we still want to collect them, so we can fetch them in the background. // we won't wait for them, since that would block the updates loop log.debug('loading missing peers for %s (pts = %d, cid = %d)', upd._, pending.pts, pending.channelId) missing = await this._fetchMissingPeers(upd, pending.peers, pending.fromDifference) if (!pending.fromDifference && missing.size) { log.debug( 'fetching difference because some peers were min (%J) and not cached for %s (pts = %d, cid = %d)', missing, upd._, pending.pts, pending.channelId, ) if (pending.channelId && !(upd._ === 'updateNewChannelMessage' && upd.message._ === 'messageService')) { // don't replace service messages, because they can be about bot's kicking this._fetchChannelDifferenceLater(requestedDiff, pending.channelId, pending.ptsBefore) } else { this._fetchDifferenceLater(requestedDiff) } return } if (missing.size) { log.debug( 'peers still missing after fetching difference: %J for %s (pts = %d, cid = %d)', missing, upd._, pending.pts, pending.channelId, ) } } // apply new pts/qts, if applicable if (!unordered) { // because unordered may contain pts/qts values when received from diff if (pending.pts) { const localPts = pending.channelId ? this.cpts.get(pending.channelId) : this.pts if (localPts && pending.ptsBefore !== localPts) { log.warn( 'pts_before does not match local_pts for %s (cid = %d, pts_before = %d, pts = %d, local_pts = %d)', upd._, pending.channelId, pending.ptsBefore, pending.pts, localPts, ) } log.debug( 'applying new pts (cid = %d) because received %s: %d -> %d (before: %d, count: %d) (postponed = %s)', pending.channelId, upd._, localPts, pending.pts, pending.ptsBefore, pending.pts - pending.ptsBefore!, postponed, ) if (pending.channelId) { this.cpts.set(pending.channelId, pending.pts) this.cptsMod.set(pending.channelId, pending.pts) } else { this.pts = pending.pts } } if (pending.qtsBefore) { log.debug( 'applying new qts because received %s: %d -> %d (postponed = %s)', upd._, this.qts, pending.qtsBefore + 1, postponed, ) this.qts = pending.qts } } if (isMessageEmpty(upd)) return // this.rpsProcessing?.hit() // updates that are also used internally switch (upd._) { case 'mtcute.dummyUpdate': // we just needed to apply new pts values return case 'updateDcOptions': { const config = client.mt.network.config.getNow() if (config) { client.mt.network.config.setConfig({ ...config, dcOptions: upd.dcOptions, }) } else { client.mt.network.config.update(true).catch((err) => client.emitError(err)) } break } case 'updateConfig': client.mt.network.config.update(true).catch((err) => client.emitError(err)) break case 'updateUserName': // todo // if (upd.userId === state.auth?.userId) { // state.auth.selfUsername = upd.usernames.find((it) => it.active)?.username ?? null // } break case 'updateDeleteChannelMessages': if (!this.auth?.isBot) { await client.storage.refMsgs.delete(toggleChannelIdMark(upd.channelId), upd.messages) } break case 'updateNewMessage': case 'updateEditMessage': case 'updateNewChannelMessage': case 'updateEditChannelMessage': if (!this.auth?.isBot) { await this._storeMessageReferences(upd.message) } break } if (missing?.size) { if (this.auth?.isBot) { this.log.warn( 'missing peers (%J) after getDifference for %s (pts = %d, cid = %d)', missing, upd._, pending.pts, pending.channelId, ) } else { // force save storage so the min peers are stored await client.mt.storage.save() for (const id of missing) { Promise.resolve(client.storage.peers.getById(id)) .then((peer): unknown => { if (!peer) { this.log.warn('cannot fetch full peer %d - getPeerById returned null', id) return } // the peer will be automatically cached by the `.call()`, we don't have to do anything // todo // if (isInputPeerChannel(peer)) { // return _getChannelsBatched(client, toInputChannel(peer)) // } else if (isInputPeerUser(peer)) { // return _getUsersBatched(client, toInputUser(peer)) // } log.warn('cannot fetch full peer %d - unknown peer type %s', id, peer._) }) .catch((err) => { log.warn('error fetching full peer %d: %s', id, err) }) } } } // dispatch the update if (this.noDispatchEnabled) { const channelId = pending.channelId ?? 0 const msgId = upd._ === 'updateNewMessage' || upd._ === 'updateNewChannelMessage' ? upd.message.id : undefined // we first need to remove it from each index, and then check if it was there const foundByMsgId = msgId && this.noDispatchMsg.get(channelId)?.delete(msgId) const foundByPts = this.noDispatchPts.get(channelId)?.delete(pending.pts!) const foundByQts = this.noDispatchQts.delete(pending.qts!) if (foundByMsgId || foundByPts || foundByQts) { log.debug('not dispatching %s because it is in no_dispatch index', upd._) return } } log.debug('dispatching %s (postponed = %s)', upd._, postponed) this._handler(upd, pending.peers) } async _loop(): Promise { const { log, client, cpts, cptsMod, pendingUpdateContainers, pendingPtsUpdates, pendingPtsUpdatesPostponed, pendingQtsUpdates, pendingQtsUpdatesPostponed, pendingUnorderedUpdates, updatesLoopCv, postponedTimer, } = this log.debug('updates loop started, state available? %b', this.pts) try { if (!this.pts) { await this._fetchUpdatesState() } while (this.updatesLoopActive) { if ( !( pendingUpdateContainers.length || pendingPtsUpdates.length || pendingQtsUpdates.length || pendingUnorderedUpdates.length || this.hasTimedoutPostponed ) ) { await updatesLoopCv.wait() } if (!this.updatesLoopActive) break log.debug( 'updates loop tick. pending containers: %d, pts: %d, pts_postponed: %d, qts: %d, qts_postponed: %d, unordered: %d', pendingUpdateContainers.length, pendingPtsUpdates.length, pendingPtsUpdatesPostponed.length, pendingQtsUpdates.length, pendingQtsUpdatesPostponed.length, pendingUnorderedUpdates.length, ) const requestedDiff = new Map>() // first process pending containers while (pendingUpdateContainers.length) { const { upd, seqStart, seqEnd } = pendingUpdateContainers.popFront()! switch (upd._) { case 'updatesTooLong': log.debug('received updatesTooLong, fetching difference') this._fetchDifferenceLater(requestedDiff) break case 'updatesCombined': case 'updates': { if (seqStart !== 0) { // https://t.me/tdlibchat/5843 const nextLocalSeq = this.seq! + 1 log.debug( 'received seq-ordered %s (seq_start = %d, seq_end = %d, size = %d)', upd._, seqStart, seqEnd, upd.updates.length, ) if (nextLocalSeq > seqStart) { log.debug( 'ignoring updates group because already applied (by seq: exp %d, got %d)', nextLocalSeq, seqStart, ) // "the updates were already applied, and must be ignored" continue } if (nextLocalSeq < seqStart) { log.debug( 'fetching difference because gap detected (by seq: exp %d, got %d)', nextLocalSeq, seqStart, ) // "there's an updates gap that must be filled" this._fetchDifferenceLater(requestedDiff) } } else { log.debug('received %s (size = %d)', upd._, upd.updates.length) } await client.storage.peers.updatePeersFrom(upd) const peers = PeersIndex.from(upd) for (const update of upd.updates) { switch (update._) { case 'updateChannelTooLong': log.debug( 'received updateChannelTooLong for channel %d (pts = %d) in container, fetching diff', update.channelId, update.pts, ) this._fetchChannelDifferenceLater(requestedDiff, update.channelId, update.pts) continue case 'updatePtsChanged': // see https://github.com/tdlib/td/blob/07c1d53a6d3cb1fad58d2822e55eef6d57363581/td/telegram/UpdatesManager.cpp#L4051 if (client.mt.network.getPoolSize('main') > 1) { // highload bot log.debug( 'updatePtsChanged received, resetting pts to 1 and fetching difference', ) this.pts = 1 this._fetchDifferenceLater(requestedDiff) } else { log.debug('updatePtsChanged received, fetching updates state') await this._fetchUpdatesState() } continue } const parsed = toPendingUpdate(update, peers) if (parsed.ptsBefore !== undefined) { pendingPtsUpdates.add(parsed) } else if (parsed.qtsBefore !== undefined) { pendingQtsUpdates.add(parsed) } else { pendingUnorderedUpdates.pushBack(parsed) } } if (seqEnd !== 0 && seqEnd > this.seq!) { this.seq = seqEnd this.date = upd.date } break } case 'updateShort': { log.debug('received short %s', upd._) const parsed = toPendingUpdate(upd.update, new PeersIndex()) if (parsed.ptsBefore !== undefined) { pendingPtsUpdates.add(parsed) } else if (parsed.qtsBefore !== undefined) { pendingQtsUpdates.add(parsed) } else { pendingUnorderedUpdates.pushBack(parsed) } break } case 'updateShortMessage': { log.debug('received updateShortMessage') const message: tl.RawMessage = { _: 'message', out: upd.out, mentioned: upd.mentioned, mediaUnread: upd.mediaUnread, silent: upd.silent, id: upd.id, fromId: { _: 'peerUser', userId: upd.out ? this.auth!.userId : upd.userId, }, peerId: { _: 'peerUser', userId: upd.userId, }, fwdFrom: upd.fwdFrom, viaBotId: upd.viaBotId, replyTo: upd.replyTo, date: upd.date, message: upd.message, entities: upd.entities, ttlPeriod: upd.ttlPeriod, } const update: tl.RawUpdateNewMessage = { _: 'updateNewMessage', message, pts: upd.pts, ptsCount: upd.ptsCount, } pendingPtsUpdates.add({ update, ptsBefore: upd.pts - upd.ptsCount, pts: upd.pts, peers: new PeersIndex(), fromDifference: false, }) break } case 'updateShortChatMessage': { log.debug('received updateShortChatMessage') const message: tl.RawMessage = { _: 'message', out: upd.out, mentioned: upd.mentioned, mediaUnread: upd.mediaUnread, silent: upd.silent, id: upd.id, fromId: { _: 'peerUser', userId: upd.fromId, }, peerId: { _: 'peerChat', chatId: upd.chatId, }, fwdFrom: upd.fwdFrom, viaBotId: upd.viaBotId, replyTo: upd.replyTo, date: upd.date, message: upd.message, entities: upd.entities, ttlPeriod: upd.ttlPeriod, } const update: tl.RawUpdateNewMessage = { _: 'updateNewMessage', message, pts: upd.pts, ptsCount: upd.ptsCount, } pendingPtsUpdates.add({ update, ptsBefore: upd.pts - upd.ptsCount, pts: upd.pts, peers: new PeersIndex(), fromDifference: false, }) break } case 'updateShortSentMessage': { // should not happen log.warn('received updateShortSentMessage') break } default: assertNever(upd) } } // process pts-ordered updates while (pendingPtsUpdates.length) { const pending = pendingPtsUpdates.popFront()! const upd = pending.update // check pts let localPts: number | null = null if (!pending.channelId) localPts = this.pts! else if (cpts.has(pending.channelId)) { localPts = cpts.get(pending.channelId)! } else if (this.catchUpChannels) { // only load stored channel pts in case // the user has enabled catching up. // not loading stored pts effectively disables // catching up, but doesn't interfere with further // update gaps (i.e. first update received is considered // to be the base state) const saved = await client.storage.updates.getChannelPts(pending.channelId) if (saved) { cpts.set(pending.channelId, saved) localPts = saved } } if (localPts) { const diff = localPts - pending.ptsBefore! // PTS can only go up or drop cardinally const isPtsDrop = diff > 1000009 if (diff > 0 && !isPtsDrop) { // "the update was already applied, and must be ignored" log.debug( 'ignoring %s (cid = %d) because already applied (by pts: exp %d, got %d)', upd._, pending.channelId, localPts, pending.ptsBefore, ) continue } if (diff < 0) { // "there's an update gap that must be filled" // if the gap is less than 3, put the update into postponed queue // otherwise, call getDifference if (diff > -3) { log.debug( 'postponing %s for 0.5s (cid = %d) because small gap detected (by pts: exp %d, got %d, diff=%d)', upd._, pending.channelId, localPts, pending.ptsBefore, diff, ) pending.timeout = Date.now() + 500 pendingPtsUpdatesPostponed.add(pending) postponedTimer.emitBefore(pending.timeout) } else if (diff > -1000000) { log.debug( 'fetching difference after %s (cid = %d) because pts gap detected (by pts: exp %d, got %d, diff=%d)', upd._, pending.channelId, localPts, pending.ptsBefore, diff, ) if (pending.channelId) { this._fetchChannelDifferenceLater(requestedDiff, pending.channelId) } else { this._fetchDifferenceLater(requestedDiff) } } else { log.debug( 'skipping all updates because pts gap is too big (by pts: exp %d, got %d, diff=%d)', localPts, pending.ptsBefore, diff, ) if (pending.channelId) { cpts.set(pending.channelId, 0) cptsMod.set(pending.channelId, 0) } else { await this._fetchUpdatesState() } } continue } if (isPtsDrop) { log.debug('pts drop detected (%d -> %d)', localPts, pending.ptsBefore) } } await this._onUpdate(pending, requestedDiff) } // process postponed pts-ordered updates for (let item = pendingPtsUpdatesPostponed._first; item; item = item.n) { // awesome fucking iteration because i'm so fucking tired and wanna kms const pending = item.v const upd = pending.update let localPts if (!pending.channelId) localPts = this.pts! else if (cpts.has(pending.channelId)) { localPts = cpts.get(pending.channelId) } // channel pts from storage will be available because we loaded it earlier if (!localPts) { log.warn( 'local pts not available for postponed %s (cid = %d), skipping', upd._, pending.channelId, ) continue } // check the pts to see if the gap was filled if (localPts > pending.ptsBefore!) { // "the update was already applied, and must be ignored" log.debug( 'ignoring postponed %s (cid = %d) because already applied (by pts: exp %d, got %d)', upd._, pending.channelId, localPts, pending.ptsBefore, ) pendingPtsUpdatesPostponed._remove(item) continue } if (localPts < pending.ptsBefore!) { // "there's an update gap that must be filled" // if the timeout has not expired yet, keep the update in the queue // otherwise, fetch diff const now = Date.now() if (now < pending.timeout!) { log.debug( 'postponed %s (cid = %d) is still waiting (%dms left) (current pts %d, need %d)', upd._, pending.channelId, pending.timeout! - now, localPts, pending.ptsBefore, ) } else { log.debug( "gap for postponed %s (cid = %d) wasn't filled, fetching diff (current pts %d, need %d)", upd._, pending.channelId, localPts, pending.ptsBefore, ) pendingPtsUpdatesPostponed._remove(item) if (pending.channelId) { this._fetchChannelDifferenceLater(requestedDiff, pending.channelId) } else { this._fetchDifferenceLater(requestedDiff) } } continue } await this._onUpdate(pending, requestedDiff, true) pendingPtsUpdatesPostponed._remove(item) } // process qts-ordered updates while (pendingQtsUpdates.length) { const pending = pendingQtsUpdates.popFront()! const upd = pending.update // check qts const diff = this.qts! - pending.qtsBefore! const isQtsDrop = diff > 1000009 if (diff > 0 && !isQtsDrop) { // "the update was already applied, and must be ignored" log.debug( 'ignoring %s because already applied (by qts: exp %d, got %d)', upd._, this.qts!, pending.qtsBefore, ) continue } if (this.qts! < pending.qtsBefore!) { // "there's an update gap that must be filled" // if the gap is less than 3, put the update into postponed queue // otherwise, call getDifference if (diff > -3) { log.debug( 'postponing %s for 0.5s because small gap detected (by qts: exp %d, got %d, diff=%d)', upd._, this.qts!, pending.qtsBefore, diff, ) pending.timeout = Date.now() + 500 pendingQtsUpdatesPostponed.add(pending) postponedTimer.emitBefore(pending.timeout) } else { log.debug( 'fetching difference after %s because qts gap detected (by qts: exp %d, got %d, diff=%d)', upd._, this.qts!, pending.qtsBefore, diff, ) this._fetchDifferenceLater(requestedDiff) } continue } if (isQtsDrop) { log.debug('qts drop detected (%d -> %d)', this.qts, pending.qtsBefore) } await this._onUpdate(pending, requestedDiff) } // process postponed qts-ordered updates for (let item = pendingQtsUpdatesPostponed._first; item; item = item.n) { // awesome fucking iteration because i'm so fucking tired and wanna kms const pending = item.v const upd = pending.update // check the pts to see if the gap was filled if (this.qts! > pending.qtsBefore!) { // "the update was already applied, and must be ignored" log.debug( 'ignoring postponed %s because already applied (by qts: exp %d, got %d)', upd._, this.qts!, pending.qtsBefore, ) continue } if (this.qts! < pending.qtsBefore!) { // "there's an update gap that must be filled" // if the timeout has not expired yet, keep the update in the queue // otherwise, fetch diff const now = Date.now() if (now < pending.timeout!) { log.debug( 'postponed %s is still waiting (%dms left) (current qts %d, need %d)', upd._, pending.timeout! - now, this.qts!, pending.qtsBefore, ) } else { log.debug( "gap for postponed %s wasn't filled, fetching diff (current qts %d, need %d)", upd._, this.qts!, pending.qtsBefore, ) pendingQtsUpdatesPostponed._remove(item) this._fetchDifferenceLater(requestedDiff) } continue } // gap was filled, and the update can be applied await this._onUpdate(pending, requestedDiff, true) pendingQtsUpdatesPostponed._remove(item) } this.hasTimedoutPostponed = false // wait for all pending diffs to load while (requestedDiff.size) { log.debug( 'waiting for %d pending diffs before processing unordered: %J', requestedDiff.size, requestedDiff.keys(), ) await Promise.all([...requestedDiff.values()]) // diff results may as well contain new diffs to be requested log.debug( 'pending diffs awaited, new diffs requested: %d (%J)', requestedDiff.size, requestedDiff.keys(), ) } // process unordered updates (or updates received from diff) while (pendingUnorderedUpdates.length) { const pending = pendingUnorderedUpdates.popFront()! await this._onUpdate(pending, requestedDiff, false, true) } // onUpdate may also call getDiff in some cases, so we also need to check // diff may also contain new updates, which will be processed in the next tick, // but we don't want to postpone diff fetching while (requestedDiff.size) { log.debug( 'waiting for %d pending diffs after processing unordered: %J', requestedDiff.size, requestedDiff.keys(), ) await Promise.all([...requestedDiff.values()]) // diff results may as well contain new diffs to be requested log.debug( 'pending diffs awaited, new diffs requested: %d (%j)', requestedDiff.size, requestedDiff.keys(), ) } // save new update state await this._saveUpdatesStorage(true) } log.debug('updates loop stopped') } catch (e) { log.error('updates loop encountered error, restarting: %s', e) return this._loop() } } }