diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index cedd949a..0f73159b 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -3479,9 +3479,11 @@ export class TelegramClient extends BaseTelegramClient { protected _defaultParseMode: string | null protected _updLock: AsyncLock protected _pts?: number + protected _qts?: number protected _date?: number protected _seq?: number protected _oldPts?: number + protected _oldQts?: number protected _oldDate?: number protected _oldSeq?: number protected _selfChanged: boolean diff --git a/packages/client/src/methods/updates.ts b/packages/client/src/methods/updates.ts index c108080e..864e9f2a 100644 --- a/packages/client/src/methods/updates.ts +++ b/packages/client/src/methods/updates.ts @@ -28,12 +28,14 @@ interface UpdatesState { // so store everything here, and load & save // every time session is loaded & saved. _pts?: number + _qts?: number _date?: number _seq?: number // old values of the updates state (i.e. as in DB) // used to avoid redundant storage calls _oldPts?: number + _oldQts?: number _oldDate?: number _oldSeq?: number _selfChanged: boolean @@ -70,12 +72,14 @@ function _initializeUpdates(this: TelegramClient) { */ export async function _fetchUpdatesState(this: TelegramClient): Promise { const state = await this.call({ _: 'updates.getState' }) + this._qts = state.qts this._pts = state.pts this._date = state.date this._seq = state.seq debug( - 'loaded initial state: pts=%d, date=%d, seq=%d', + 'loaded initial state: pts=%d, qts=%d, date=%d, seq=%d', state.pts, + state.qts, state.date, state.seq ) @@ -90,8 +94,9 @@ export async function _loadStorage(this: TelegramClient): Promise { const state = await this.storage.getUpdatesState() if (state) { this._pts = this._oldPts = state[0] - this._date = this._oldDate = state[1] - this._seq = this._oldSeq = state[2] + this._qts = this._oldQts = state[1] + this._date = this._oldDate = state[2] + this._seq = this._oldSeq = state[3] } // if no state, don't bother initializing properties // since that means that there is no authorization, @@ -128,6 +133,8 @@ export async function _saveStorage( // if old* value is not available, assume it has changed. if (this._oldPts === undefined || this._oldPts !== this._pts) await this.storage.setUpdatesPts(this._pts) + if (this._oldQts === undefined || this._oldQts !== this._qts) + await this.storage.setUpdatesPts(this._qts!) if (this._oldDate === undefined || this._oldDate !== this._date) await this.storage.setUpdatesDate(this._date!) if (this._oldSeq === undefined || this._oldSeq !== this._seq) @@ -135,6 +142,7 @@ export async function _saveStorage( // update old* values this._oldPts = this._pts + this._oldQts = this._qts this._oldDate = this._date this._oldSeq = this._seq @@ -178,39 +186,53 @@ interface NoDispatchIndex { msg: Record> // channel id or 0 => pts pts: Record> + qts: Record } // creating and using a no-dispatch index is pretty expensive, // but its not a big deal since it's actually rarely needed function _createNoDispatchIndex( - updates?: tl.TypeUpdates + updates?: tl.TypeUpdates | tl.TypeUpdate ): NoDispatchIndex | undefined { if (!updates) return undefined const ret: NoDispatchIndex = { msg: {}, pts: {}, + qts: {}, + } + + function addUpdate(upd: tl.TypeUpdate) { + const cid = extractChannelIdFromUpdate(upd) ?? 0 + const pts = 'pts' in upd ? upd.pts : undefined + + if (pts) { + if (!ret.pts[cid]) ret.pts[cid] = {} + ret.pts[cid][pts] = true + } + + const qts = 'qts' in upd ? upd.qts : undefined + if (qts) { + ret.qts[qts] = true + } + + switch (upd._) { + case 'updateNewMessage': + case 'updateNewChannelMessage': { + const cid = + upd.message.peerId?._ === 'peerChannel' + ? upd.message.peerId.channelId + : 0 + if (!ret.msg[cid]) ret.msg[cid] = {} + ret.msg[cid][upd.message.id] = true + break + } + } } switch (updates._) { case 'updates': case 'updatesCombined': - updates.updates.forEach((upd) => { - const cid = extractChannelIdFromUpdate(upd) ?? 0 - switch (upd._) { - case 'updateNewMessage': - case 'updateNewChannelMessage': - if (!ret.msg[cid]) ret.msg[cid] = {} - ret.msg[cid][upd.message.id] = true - break - } - - const pts = 'pts' in upd ? upd.pts : undefined - - if (pts) { - if (!ret.msg[cid]) ret.msg[cid] = {} - ret.msg[cid][pts] = true - } - }) + updates.updates.forEach(addUpdate) break case 'updateShortMessage': case 'updateShortChatMessage': @@ -222,6 +244,14 @@ function _createNoDispatchIndex( ret.msg[0][updates.id] = true ret.pts[0][updates.pts] = true break + case 'updateShort': + addUpdate(updates.update) + break + case 'updatesTooLong': + break + default: + addUpdate(updates) + break } return ret @@ -401,7 +431,7 @@ async function _loadDifference( _: 'updates.getDifference', pts: this._pts!, date: this._date!, - qts: 0, + qts: this._qts!, }) switch (diff._) { @@ -449,6 +479,7 @@ async function _loadDifference( const cid = extractChannelIdFromUpdate(upd) const pts = 'pts' in upd ? upd.pts : undefined const ptsCount = 'ptsCount' in upd ? upd.ptsCount : undefined + const qts = 'qts' in upd ? upd.qts : undefined if (cid && pts !== undefined && ptsCount !== undefined) { // check that this pts is in fact the next one @@ -482,14 +513,16 @@ async function _loadDifference( this._cptsMod[cid] = pts } - if (noDispatch && pts) { - if (noDispatch.pts[cid ?? 0]?.[pts]) continue + if (noDispatch) { + if (pts && noDispatch.pts[cid ?? 0]?.[pts]) continue + if (qts && noDispatch.qts[qts]) continue } this._dispatchUpdate(upd, users, chats) } this._pts = state.pts + this._qts = state.qts this._date = state.date if (diff._ === 'updates.difference') return @@ -570,7 +603,10 @@ async function _loadChannelDifference( if (pts && noDispatch.pts[channelId]?.[pts]) return } - if (upd._ === 'updateNewChannelMessage' && upd.message._ === 'messageEmpty') + if ( + upd._ === 'updateNewChannelMessage' && + upd.message._ === 'messageEmpty' + ) return this._dispatchUpdate(upd, users, chats) @@ -585,6 +621,136 @@ async function _loadChannelDifference( this._cptsMod[channelId] = pts } +async function _processSingleUpdate( + this: TelegramClient, + upd: tl.TypeUpdate, + peers: { + users: UsersIndex + chats: ChatsIndex + } | null, + noDispatch?: boolean +): Promise { + const channelId = extractChannelIdFromUpdate(upd) + const pts = 'pts' in upd ? upd.pts : undefined + const ptsCount = 'ptsCount' in upd ? upd.ptsCount : undefined + const qts = 'qts' in upd ? upd.qts : undefined + + if (pts !== undefined && ptsCount !== undefined) { + let nextLocalPts: number | null = null + if (channelId === undefined) nextLocalPts = this._pts! + ptsCount + else if (channelId in this._cpts) + nextLocalPts = this._cpts[channelId] + ptsCount + else if (this._catchUpChannels) { + // only load stored channel pts in case + // the user has enabled catching up. + // not loading stored pts effectively disables + // catching up, but doesn't interfere with further + // update gaps + + const saved = await this.storage.getChannelPts(channelId) + if (saved) { + this._cpts[channelId] = saved + nextLocalPts = saved + ptsCount + } + } + + if (nextLocalPts) { + if (nextLocalPts > pts) + // "the update was already applied, and must be ignored" + return + if (nextLocalPts < pts) { + if (channelId) { + // "there's an update gap that must be filled" + await _loadChannelDifference.call( + this, + channelId, + noDispatch ? _createNoDispatchIndex(upd) : undefined, + pts + ) + } else { + await _loadDifference.call( + this, + noDispatch ? _createNoDispatchIndex(upd) : undefined + ) + } + return + } + } + } + + if (qts !== undefined) { + // qts is only used for non-channel updates + const nextLocalQts = this._qts! + 1 + + if (nextLocalQts > qts) + // "the update was already applied, and must be ignored" + return + if (nextLocalQts < qts) + return await _loadDifference.call( + this, + noDispatch ? _createNoDispatchIndex(upd) : undefined + ) + } + + if (isDummyUpdate(upd) || noDispatch) { + // we needed to check pts/qts, so we couldn't return right away + return + } + + // updates that are also used internally + switch (upd._) { + case 'updateDcOptions': + if (!this._config) { + this._config = await this.call({ _: 'help.getConfig' }) + } else { + ;(this._config as tl.Mutable).dcOptions = + upd.dcOptions + } + break + case 'updateConfig': + this._config = await this.call({ _: 'help.getConfig' }) + break + case 'updateUserName': + if (upd.userId === this._userId) { + this._selfUsername = upd.username || null + } + break + } + + // all checks passed, dispatch the update + + if (!noDispatch) { + if (!peers) { + // this is a short update, let's fetch cached peers + peers = await _fetchPeersForShort.call( + this, + upd + ) + if (!peers) { + // some peer is not cached. + // need to re-fetch the thing, and cache them on the way + return await _loadDifference.call(this) + } + } + + this._dispatchUpdate(upd, peers.users, peers.chats) + } + + // update local pts/qts + if (pts) { + if (channelId) { + this._cpts[channelId] = pts + this._cptsMod[channelId] = pts + } else { + this._pts = pts + } + } + + if (qts) { + this._qts = qts + } +} + /** * @internal */ @@ -607,10 +773,6 @@ export function _handleUpdate( // additionally, locking here blocks updates handling while we are // loading difference inside update handler. - const noDispatchIndex = noDispatch - ? _createNoDispatchIndex(update) - : undefined - this._updLock .acquire() .then(async () => { @@ -619,9 +781,14 @@ export function _handleUpdate( // i tried my best to follow the documentation, but i still may have missed something. // feel free to contribute! // reference: https://core.telegram.org/api/updates + // (though it is out of date: https://t.me/tdlibchat/20155) switch (update._) { - case 'updatesTooLong': // "there are too many events pending to be pushed to the client", we need to fetch them manually - await _loadDifference.call(this, noDispatchIndex) + case 'updatesTooLong': + // "there are too many events pending to be pushed to the client", we need to fetch them manually + await _loadDifference.call( + this, + noDispatch ? _createNoDispatchIndex(update) : undefined + ) break case 'updates': case 'updatesCombined': { @@ -659,125 +826,51 @@ export function _handleUpdate( } } - const { users, chats } = createUsersChatsIndex(update) + const peers = createUsersChatsIndex(update) for (const upd of update.updates) { if (upd._ === 'updateChannelTooLong') { await _loadChannelDifference.call( this, upd.channelId, - noDispatchIndex, + undefined, // noDispatchIndex, upd.pts ) continue } - const channelId = extractChannelIdFromUpdate(upd) - const pts = 'pts' in upd ? upd.pts : undefined - const ptsCount = - 'ptsCount' in upd ? upd.ptsCount : undefined - - if (pts !== undefined && ptsCount !== undefined) { - let nextLocalPts: number | null = null - if (channelId === undefined) - nextLocalPts = this._pts! + ptsCount - else if (channelId in this._cpts) - nextLocalPts = this._cpts[channelId] + ptsCount - else if (this._catchUpChannels) { - // only load stored channel pts in case - // the user has enabled catching up. - // not loading stored pts effectively disables - // catching up, but doesn't interfere with further - // update gaps - - const saved = await this.storage.getChannelPts( - channelId - ) - if (saved) { - this._cpts[channelId] = saved - nextLocalPts = saved + ptsCount - } - } - - if (nextLocalPts) { - if (nextLocalPts > pts) - // "the update was already applied, and must be ignored" - return - if (nextLocalPts < pts) - if (channelId) { - // "there's an update gap that must be filled" - await _loadChannelDifference.call( - this, - channelId, - noDispatchIndex, - pts - ) - continue - } else { - return await _loadDifference.call(this) - } - } - - if (!isDummyUpdate(upd) && !noDispatch) { - this._dispatchUpdate(upd, users, chats) - } - - if (channelId) { - this._cpts[channelId] = pts - this._cptsMod[channelId] = pts - } else { - this._pts = pts - } - } else if (!noDispatch) { - this._dispatchUpdate(upd, users, chats) - } + await _processSingleUpdate.call( + this, + upd, + peers, + noDispatch + ) } - if (!isDummyUpdates(update)) { - if (update.seq !== 0) this._seq = update.seq + if (update.seq !== 0 && update.seq > this._seq!) { + // https://t.me/tdlibchat/5844 + // we also need to check that update seq > this._seq in case + // there was a gap that was filled inside _processSingleUpdate + this._seq = update.seq this._date = update.date } break } case 'updateShort': { const upd = update.update - if (upd._ === 'updateDcOptions' && this._config) { - ;(this._config as tl.Mutable).dcOptions = - upd.dcOptions - } else if (upd._ === 'updateConfig') { - this._config = await this.call({ _: 'help.getConfig' }) - } else if (upd._ === 'updateUserName' && upd.userId === this._userId) { - this._selfUsername = upd.username || null - } else { - if (!noDispatch) { - const peers = await _fetchPeersForShort.call( - this, - upd - ) - if (!peers) { - // some peer is not cached. - // need to re-fetch the thing, and cache them on the way - return await _loadDifference.call(this) - } - this._dispatchUpdate(upd, peers.users, peers.chats) - } - } + await _processSingleUpdate.call( + this, + upd, + null, + noDispatch + ) this._date = update.date + break } case 'updateShortMessage': { - if (noDispatch) return - - const nextLocalPts = this._pts! + update.ptsCount - if (nextLocalPts > update.pts) - // "the update was already applied, and must be ignored" - return - if (nextLocalPts < update.pts) - // "there's an update gap that must be filled" - return await _loadDifference.call(this) - const message: tl.RawMessage = { _: 'message', out: update.out, @@ -802,30 +895,23 @@ export function _handleUpdate( ttlPeriod: update.ttlPeriod, } - const peers = await _fetchPeersForShort.call(this, message) - if (!peers) { - // some peer is not cached. - // need to re-fetch the thing, and cache them on the way - return await _loadDifference.call(this) + const upd: tl.RawUpdateNewMessage = { + _: 'updateNewMessage', + message, + pts: update.pts, + ptsCount: update.ptsCount } - this._date = update.date - this._pts = update.pts + await _processSingleUpdate.call( + this, + upd, + null, + noDispatch + ) - this._dispatchUpdate(message, peers.users, peers.chats) break } case 'updateShortChatMessage': { - if (noDispatch) return - - const nextLocalPts = this._pts! + update.ptsCount - if (nextLocalPts > update.pts) - // "the update was already applied, and must be ignored" - return - if (nextLocalPts < update.pts) - // "there's an update gap that must be filled" - return await _loadDifference.call(this) - const message: tl.RawMessage = { _: 'message', out: update.out, @@ -850,17 +936,20 @@ export function _handleUpdate( ttlPeriod: update.ttlPeriod, } - const peers = await _fetchPeersForShort.call(this, message) - if (!peers) { - // some peer is not cached. - // need to re-fetch the thing, and cache them on the way - return await _loadDifference.call(this) + const upd: tl.RawUpdateNewMessage = { + _: 'updateNewMessage', + message, + pts: update.pts, + ptsCount: update.ptsCount } - this._date = update.date - this._pts = update.pts + await _processSingleUpdate.call( + this, + upd, + null, + noDispatch + ) - this._dispatchUpdate(message, peers.users, peers.chats) break } case 'updateShortSentMessage': { diff --git a/packages/core/src/storage/abstract.ts b/packages/core/src/storage/abstract.ts index d126e074..8de6b0ff 100644 --- a/packages/core/src/storage/abstract.ts +++ b/packages/core/src/storage/abstract.ts @@ -102,14 +102,18 @@ export interface ITelegramStorage { /** * Get updates state (if available), represented as a tuple - * containing: `pts, date, seq` + * containing: `pts, qts, date, seq` */ - getUpdatesState(): MaybeAsync<[number, number, number] | null> + getUpdatesState(): MaybeAsync<[number, number, number, number] | null> /** * Set common `pts` value */ setUpdatesPts(val: number): MaybeAsync + /** + * Set common `qts` value + */ + setUpdatesQts(val: number): MaybeAsync /** * Set updates `date` value */ diff --git a/packages/core/src/storage/memory.ts b/packages/core/src/storage/memory.ts index fa416323..0521c18c 100644 --- a/packages/core/src/storage/memory.ts +++ b/packages/core/src/storage/memory.ts @@ -22,8 +22,8 @@ interface MemorySessionState { // username -> peer id usernameIndex: Record - // common pts, date, seq - gpts: [number, number, number] | null + // common pts, date, seq, qts + gpts: [number, number, number, number] | null // channel pts pts: Record @@ -283,25 +283,30 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ { return this._state.pts[entityId] ?? null } - getUpdatesState(): MaybeAsync<[number, number, number] | null> { + getUpdatesState(): MaybeAsync<[number, number, number, number] | null> { return this._state.gpts ?? null } setUpdatesPts(val: number): MaybeAsync { - if (!this._state.gpts) this._state.gpts = [0, 0, 0] + if (!this._state.gpts) this._state.gpts = [0, 0, 0, 0] this._state.gpts[0] = val } - setUpdatesDate(val: number): MaybeAsync { - if (!this._state.gpts) this._state.gpts = [0, 0, 0] + setUpdatesQts(val: number): MaybeAsync { + if (!this._state.gpts) this._state.gpts = [0, 0, 0, 0] this._state.gpts[1] = val } - setUpdatesSeq(val: number): MaybeAsync { - if (!this._state.gpts) this._state.gpts = [0, 0, 0] + setUpdatesDate(val: number): MaybeAsync { + if (!this._state.gpts) this._state.gpts = [0, 0, 0, 0] this._state.gpts[2] = val } + setUpdatesSeq(val: number): MaybeAsync { + if (!this._state.gpts) this._state.gpts = [0, 0, 0, 0] + this._state.gpts[3] = val + } + getFullPeerById(id: number): tl.TypeUser | tl.TypeChat | null { return this._cachedFull.get(id) ?? null }