fix(client): reworked updates, added support for qts

This commit is contained in:
teidesu 2021-08-04 12:58:57 +03:00
parent 781384cb86
commit a3cf0c526a
4 changed files with 261 additions and 161 deletions

View file

@ -3479,9 +3479,11 @@ export class TelegramClient extends BaseTelegramClient {
protected _defaultParseMode: string | null protected _defaultParseMode: string | null
protected _updLock: AsyncLock protected _updLock: AsyncLock
protected _pts?: number protected _pts?: number
protected _qts?: number
protected _date?: number protected _date?: number
protected _seq?: number protected _seq?: number
protected _oldPts?: number protected _oldPts?: number
protected _oldQts?: number
protected _oldDate?: number protected _oldDate?: number
protected _oldSeq?: number protected _oldSeq?: number
protected _selfChanged: boolean protected _selfChanged: boolean

View file

@ -28,12 +28,14 @@ interface UpdatesState {
// so store everything here, and load & save // so store everything here, and load & save
// every time session is loaded & saved. // every time session is loaded & saved.
_pts?: number _pts?: number
_qts?: number
_date?: number _date?: number
_seq?: number _seq?: number
// old values of the updates state (i.e. as in DB) // old values of the updates state (i.e. as in DB)
// used to avoid redundant storage calls // used to avoid redundant storage calls
_oldPts?: number _oldPts?: number
_oldQts?: number
_oldDate?: number _oldDate?: number
_oldSeq?: number _oldSeq?: number
_selfChanged: boolean _selfChanged: boolean
@ -70,12 +72,14 @@ function _initializeUpdates(this: TelegramClient) {
*/ */
export async function _fetchUpdatesState(this: TelegramClient): Promise<void> { export async function _fetchUpdatesState(this: TelegramClient): Promise<void> {
const state = await this.call({ _: 'updates.getState' }) const state = await this.call({ _: 'updates.getState' })
this._qts = state.qts
this._pts = state.pts this._pts = state.pts
this._date = state.date this._date = state.date
this._seq = state.seq this._seq = state.seq
debug( debug(
'loaded initial state: pts=%d, date=%d, seq=%d', 'loaded initial state: pts=%d, qts=%d, date=%d, seq=%d',
state.pts, state.pts,
state.qts,
state.date, state.date,
state.seq state.seq
) )
@ -90,8 +94,9 @@ export async function _loadStorage(this: TelegramClient): Promise<void> {
const state = await this.storage.getUpdatesState() const state = await this.storage.getUpdatesState()
if (state) { if (state) {
this._pts = this._oldPts = state[0] this._pts = this._oldPts = state[0]
this._date = this._oldDate = state[1] this._qts = this._oldQts = state[1]
this._seq = this._oldSeq = state[2] this._date = this._oldDate = state[2]
this._seq = this._oldSeq = state[3]
} }
// if no state, don't bother initializing properties // if no state, don't bother initializing properties
// since that means that there is no authorization, // since that means that there is no authorization,
@ -128,6 +133,8 @@ export async function _saveStorage(
// if old* value is not available, assume it has changed. // if old* value is not available, assume it has changed.
if (this._oldPts === undefined || this._oldPts !== this._pts) if (this._oldPts === undefined || this._oldPts !== this._pts)
await this.storage.setUpdatesPts(this._pts) await this.storage.setUpdatesPts(this._pts)
if (this._oldQts === undefined || this._oldQts !== this._qts)
await this.storage.setUpdatesPts(this._qts!)
if (this._oldDate === undefined || this._oldDate !== this._date) if (this._oldDate === undefined || this._oldDate !== this._date)
await this.storage.setUpdatesDate(this._date!) await this.storage.setUpdatesDate(this._date!)
if (this._oldSeq === undefined || this._oldSeq !== this._seq) if (this._oldSeq === undefined || this._oldSeq !== this._seq)
@ -135,6 +142,7 @@ export async function _saveStorage(
// update old* values // update old* values
this._oldPts = this._pts this._oldPts = this._pts
this._oldQts = this._qts
this._oldDate = this._date this._oldDate = this._date
this._oldSeq = this._seq this._oldSeq = this._seq
@ -178,39 +186,53 @@ interface NoDispatchIndex {
msg: Record<number, Record<number, true>> msg: Record<number, Record<number, true>>
// channel id or 0 => pts // channel id or 0 => pts
pts: Record<number, Record<number, true>> pts: Record<number, Record<number, true>>
qts: Record<number, true>
} }
// creating and using a no-dispatch index is pretty expensive, // creating and using a no-dispatch index is pretty expensive,
// but its not a big deal since it's actually rarely needed // but its not a big deal since it's actually rarely needed
function _createNoDispatchIndex( function _createNoDispatchIndex(
updates?: tl.TypeUpdates updates?: tl.TypeUpdates | tl.TypeUpdate
): NoDispatchIndex | undefined { ): NoDispatchIndex | undefined {
if (!updates) return undefined if (!updates) return undefined
const ret: NoDispatchIndex = { const ret: NoDispatchIndex = {
msg: {}, msg: {},
pts: {}, pts: {},
qts: {},
}
function addUpdate(upd: tl.TypeUpdate) {
const cid = extractChannelIdFromUpdate(upd) ?? 0
const pts = 'pts' in upd ? upd.pts : undefined
if (pts) {
if (!ret.pts[cid]) ret.pts[cid] = {}
ret.pts[cid][pts] = true
}
const qts = 'qts' in upd ? upd.qts : undefined
if (qts) {
ret.qts[qts] = true
}
switch (upd._) {
case 'updateNewMessage':
case 'updateNewChannelMessage': {
const cid =
upd.message.peerId?._ === 'peerChannel'
? upd.message.peerId.channelId
: 0
if (!ret.msg[cid]) ret.msg[cid] = {}
ret.msg[cid][upd.message.id] = true
break
}
}
} }
switch (updates._) { switch (updates._) {
case 'updates': case 'updates':
case 'updatesCombined': case 'updatesCombined':
updates.updates.forEach((upd) => { updates.updates.forEach(addUpdate)
const cid = extractChannelIdFromUpdate(upd) ?? 0
switch (upd._) {
case 'updateNewMessage':
case 'updateNewChannelMessage':
if (!ret.msg[cid]) ret.msg[cid] = {}
ret.msg[cid][upd.message.id] = true
break
}
const pts = 'pts' in upd ? upd.pts : undefined
if (pts) {
if (!ret.msg[cid]) ret.msg[cid] = {}
ret.msg[cid][pts] = true
}
})
break break
case 'updateShortMessage': case 'updateShortMessage':
case 'updateShortChatMessage': case 'updateShortChatMessage':
@ -222,6 +244,14 @@ function _createNoDispatchIndex(
ret.msg[0][updates.id] = true ret.msg[0][updates.id] = true
ret.pts[0][updates.pts] = true ret.pts[0][updates.pts] = true
break break
case 'updateShort':
addUpdate(updates.update)
break
case 'updatesTooLong':
break
default:
addUpdate(updates)
break
} }
return ret return ret
@ -401,7 +431,7 @@ async function _loadDifference(
_: 'updates.getDifference', _: 'updates.getDifference',
pts: this._pts!, pts: this._pts!,
date: this._date!, date: this._date!,
qts: 0, qts: this._qts!,
}) })
switch (diff._) { switch (diff._) {
@ -449,6 +479,7 @@ async function _loadDifference(
const cid = extractChannelIdFromUpdate(upd) const cid = extractChannelIdFromUpdate(upd)
const pts = 'pts' in upd ? upd.pts : undefined const pts = 'pts' in upd ? upd.pts : undefined
const ptsCount = 'ptsCount' in upd ? upd.ptsCount : undefined const ptsCount = 'ptsCount' in upd ? upd.ptsCount : undefined
const qts = 'qts' in upd ? upd.qts : undefined
if (cid && pts !== undefined && ptsCount !== undefined) { if (cid && pts !== undefined && ptsCount !== undefined) {
// check that this pts is in fact the next one // check that this pts is in fact the next one
@ -482,14 +513,16 @@ async function _loadDifference(
this._cptsMod[cid] = pts this._cptsMod[cid] = pts
} }
if (noDispatch && pts) { if (noDispatch) {
if (noDispatch.pts[cid ?? 0]?.[pts]) continue if (pts && noDispatch.pts[cid ?? 0]?.[pts]) continue
if (qts && noDispatch.qts[qts]) continue
} }
this._dispatchUpdate(upd, users, chats) this._dispatchUpdate(upd, users, chats)
} }
this._pts = state.pts this._pts = state.pts
this._qts = state.qts
this._date = state.date this._date = state.date
if (diff._ === 'updates.difference') return if (diff._ === 'updates.difference') return
@ -570,7 +603,10 @@ async function _loadChannelDifference(
if (pts && noDispatch.pts[channelId]?.[pts]) return if (pts && noDispatch.pts[channelId]?.[pts]) return
} }
if (upd._ === 'updateNewChannelMessage' && upd.message._ === 'messageEmpty') if (
upd._ === 'updateNewChannelMessage' &&
upd.message._ === 'messageEmpty'
)
return return
this._dispatchUpdate(upd, users, chats) this._dispatchUpdate(upd, users, chats)
@ -585,6 +621,136 @@ async function _loadChannelDifference(
this._cptsMod[channelId] = pts this._cptsMod[channelId] = pts
} }
async function _processSingleUpdate(
this: TelegramClient,
upd: tl.TypeUpdate,
peers: {
users: UsersIndex
chats: ChatsIndex
} | null,
noDispatch?: boolean
): Promise<void> {
const channelId = extractChannelIdFromUpdate(upd)
const pts = 'pts' in upd ? upd.pts : undefined
const ptsCount = 'ptsCount' in upd ? upd.ptsCount : undefined
const qts = 'qts' in upd ? upd.qts : undefined
if (pts !== undefined && ptsCount !== undefined) {
let nextLocalPts: number | null = null
if (channelId === undefined) nextLocalPts = this._pts! + ptsCount
else if (channelId in this._cpts)
nextLocalPts = this._cpts[channelId] + ptsCount
else if (this._catchUpChannels) {
// only load stored channel pts in case
// the user has enabled catching up.
// not loading stored pts effectively disables
// catching up, but doesn't interfere with further
// update gaps
const saved = await this.storage.getChannelPts(channelId)
if (saved) {
this._cpts[channelId] = saved
nextLocalPts = saved + ptsCount
}
}
if (nextLocalPts) {
if (nextLocalPts > pts)
// "the update was already applied, and must be ignored"
return
if (nextLocalPts < pts) {
if (channelId) {
// "there's an update gap that must be filled"
await _loadChannelDifference.call(
this,
channelId,
noDispatch ? _createNoDispatchIndex(upd) : undefined,
pts
)
} else {
await _loadDifference.call(
this,
noDispatch ? _createNoDispatchIndex(upd) : undefined
)
}
return
}
}
}
if (qts !== undefined) {
// qts is only used for non-channel updates
const nextLocalQts = this._qts! + 1
if (nextLocalQts > qts)
// "the update was already applied, and must be ignored"
return
if (nextLocalQts < qts)
return await _loadDifference.call(
this,
noDispatch ? _createNoDispatchIndex(upd) : undefined
)
}
if (isDummyUpdate(upd) || noDispatch) {
// we needed to check pts/qts, so we couldn't return right away
return
}
// updates that are also used internally
switch (upd._) {
case 'updateDcOptions':
if (!this._config) {
this._config = await this.call({ _: 'help.getConfig' })
} else {
;(this._config as tl.Mutable<tl.TypeConfig>).dcOptions =
upd.dcOptions
}
break
case 'updateConfig':
this._config = await this.call({ _: 'help.getConfig' })
break
case 'updateUserName':
if (upd.userId === this._userId) {
this._selfUsername = upd.username || null
}
break
}
// all checks passed, dispatch the update
if (!noDispatch) {
if (!peers) {
// this is a short update, let's fetch cached peers
peers = await _fetchPeersForShort.call(
this,
upd
)
if (!peers) {
// some peer is not cached.
// need to re-fetch the thing, and cache them on the way
return await _loadDifference.call(this)
}
}
this._dispatchUpdate(upd, peers.users, peers.chats)
}
// update local pts/qts
if (pts) {
if (channelId) {
this._cpts[channelId] = pts
this._cptsMod[channelId] = pts
} else {
this._pts = pts
}
}
if (qts) {
this._qts = qts
}
}
/** /**
* @internal * @internal
*/ */
@ -607,10 +773,6 @@ export function _handleUpdate(
// additionally, locking here blocks updates handling while we are // additionally, locking here blocks updates handling while we are
// loading difference inside update handler. // loading difference inside update handler.
const noDispatchIndex = noDispatch
? _createNoDispatchIndex(update)
: undefined
this._updLock this._updLock
.acquire() .acquire()
.then(async () => { .then(async () => {
@ -619,9 +781,14 @@ export function _handleUpdate(
// i tried my best to follow the documentation, but i still may have missed something. // i tried my best to follow the documentation, but i still may have missed something.
// feel free to contribute! // feel free to contribute!
// reference: https://core.telegram.org/api/updates // reference: https://core.telegram.org/api/updates
// (though it is out of date: https://t.me/tdlibchat/20155)
switch (update._) { switch (update._) {
case 'updatesTooLong': // "there are too many events pending to be pushed to the client", we need to fetch them manually case 'updatesTooLong':
await _loadDifference.call(this, noDispatchIndex) // "there are too many events pending to be pushed to the client", we need to fetch them manually
await _loadDifference.call(
this,
noDispatch ? _createNoDispatchIndex(update) : undefined
)
break break
case 'updates': case 'updates':
case 'updatesCombined': { case 'updatesCombined': {
@ -659,125 +826,51 @@ export function _handleUpdate(
} }
} }
const { users, chats } = createUsersChatsIndex(update) const peers = createUsersChatsIndex(update)
for (const upd of update.updates) { for (const upd of update.updates) {
if (upd._ === 'updateChannelTooLong') { if (upd._ === 'updateChannelTooLong') {
await _loadChannelDifference.call( await _loadChannelDifference.call(
this, this,
upd.channelId, upd.channelId,
noDispatchIndex, undefined, // noDispatchIndex,
upd.pts upd.pts
) )
continue continue
} }
const channelId = extractChannelIdFromUpdate(upd) await _processSingleUpdate.call(
const pts = 'pts' in upd ? upd.pts : undefined
const ptsCount =
'ptsCount' in upd ? upd.ptsCount : undefined
if (pts !== undefined && ptsCount !== undefined) {
let nextLocalPts: number | null = null
if (channelId === undefined)
nextLocalPts = this._pts! + ptsCount
else if (channelId in this._cpts)
nextLocalPts = this._cpts[channelId] + ptsCount
else if (this._catchUpChannels) {
// only load stored channel pts in case
// the user has enabled catching up.
// not loading stored pts effectively disables
// catching up, but doesn't interfere with further
// update gaps
const saved = await this.storage.getChannelPts(
channelId
)
if (saved) {
this._cpts[channelId] = saved
nextLocalPts = saved + ptsCount
}
}
if (nextLocalPts) {
if (nextLocalPts > pts)
// "the update was already applied, and must be ignored"
return
if (nextLocalPts < pts)
if (channelId) {
// "there's an update gap that must be filled"
await _loadChannelDifference.call(
this, this,
channelId, upd,
noDispatchIndex, peers,
pts noDispatch
) )
continue
} else {
return await _loadDifference.call(this)
}
} }
if (!isDummyUpdate(upd) && !noDispatch) { if (update.seq !== 0 && update.seq > this._seq!) {
this._dispatchUpdate(upd, users, chats) // https://t.me/tdlibchat/5844
} // we also need to check that update seq > this._seq in case
// there was a gap that was filled inside _processSingleUpdate
if (channelId) { this._seq = update.seq
this._cpts[channelId] = pts
this._cptsMod[channelId] = pts
} else {
this._pts = pts
}
} else if (!noDispatch) {
this._dispatchUpdate(upd, users, chats)
}
}
if (!isDummyUpdates(update)) {
if (update.seq !== 0) this._seq = update.seq
this._date = update.date this._date = update.date
} }
break break
} }
case 'updateShort': { case 'updateShort': {
const upd = update.update const upd = update.update
if (upd._ === 'updateDcOptions' && this._config) {
;(this._config as tl.Mutable<tl.TypeConfig>).dcOptions =
upd.dcOptions
} else if (upd._ === 'updateConfig') {
this._config = await this.call({ _: 'help.getConfig' })
} else if (upd._ === 'updateUserName' && upd.userId === this._userId) {
this._selfUsername = upd.username || null
} else {
if (!noDispatch) {
const peers = await _fetchPeersForShort.call(
this,
upd
)
if (!peers) {
// some peer is not cached.
// need to re-fetch the thing, and cache them on the way
return await _loadDifference.call(this)
}
this._dispatchUpdate(upd, peers.users, peers.chats) await _processSingleUpdate.call(
} this,
} upd,
null,
noDispatch
)
this._date = update.date this._date = update.date
break break
} }
case 'updateShortMessage': { case 'updateShortMessage': {
if (noDispatch) return
const nextLocalPts = this._pts! + update.ptsCount
if (nextLocalPts > update.pts)
// "the update was already applied, and must be ignored"
return
if (nextLocalPts < update.pts)
// "there's an update gap that must be filled"
return await _loadDifference.call(this)
const message: tl.RawMessage = { const message: tl.RawMessage = {
_: 'message', _: 'message',
out: update.out, out: update.out,
@ -802,30 +895,23 @@ export function _handleUpdate(
ttlPeriod: update.ttlPeriod, ttlPeriod: update.ttlPeriod,
} }
const peers = await _fetchPeersForShort.call(this, message) const upd: tl.RawUpdateNewMessage = {
if (!peers) { _: 'updateNewMessage',
// some peer is not cached. message,
// need to re-fetch the thing, and cache them on the way pts: update.pts,
return await _loadDifference.call(this) ptsCount: update.ptsCount
} }
this._date = update.date await _processSingleUpdate.call(
this._pts = update.pts this,
upd,
null,
noDispatch
)
this._dispatchUpdate(message, peers.users, peers.chats)
break break
} }
case 'updateShortChatMessage': { case 'updateShortChatMessage': {
if (noDispatch) return
const nextLocalPts = this._pts! + update.ptsCount
if (nextLocalPts > update.pts)
// "the update was already applied, and must be ignored"
return
if (nextLocalPts < update.pts)
// "there's an update gap that must be filled"
return await _loadDifference.call(this)
const message: tl.RawMessage = { const message: tl.RawMessage = {
_: 'message', _: 'message',
out: update.out, out: update.out,
@ -850,17 +936,20 @@ export function _handleUpdate(
ttlPeriod: update.ttlPeriod, ttlPeriod: update.ttlPeriod,
} }
const peers = await _fetchPeersForShort.call(this, message) const upd: tl.RawUpdateNewMessage = {
if (!peers) { _: 'updateNewMessage',
// some peer is not cached. message,
// need to re-fetch the thing, and cache them on the way pts: update.pts,
return await _loadDifference.call(this) ptsCount: update.ptsCount
} }
this._date = update.date await _processSingleUpdate.call(
this._pts = update.pts this,
upd,
null,
noDispatch
)
this._dispatchUpdate(message, peers.users, peers.chats)
break break
} }
case 'updateShortSentMessage': { case 'updateShortSentMessage': {

View file

@ -102,14 +102,18 @@ export interface ITelegramStorage {
/** /**
* Get updates state (if available), represented as a tuple * Get updates state (if available), represented as a tuple
* containing: `pts, date, seq` * containing: `pts, qts, date, seq`
*/ */
getUpdatesState(): MaybeAsync<[number, number, number] | null> getUpdatesState(): MaybeAsync<[number, number, number, number] | null>
/** /**
* Set common `pts` value * Set common `pts` value
*/ */
setUpdatesPts(val: number): MaybeAsync<void> setUpdatesPts(val: number): MaybeAsync<void>
/**
* Set common `qts` value
*/
setUpdatesQts(val: number): MaybeAsync<void>
/** /**
* Set updates `date` value * Set updates `date` value
*/ */

View file

@ -22,8 +22,8 @@ interface MemorySessionState {
// username -> peer id // username -> peer id
usernameIndex: Record<string, number> usernameIndex: Record<string, number>
// common pts, date, seq // common pts, date, seq, qts
gpts: [number, number, number] | null gpts: [number, number, number, number] | null
// channel pts // channel pts
pts: Record<number, number> pts: Record<number, number>
@ -283,25 +283,30 @@ export class MemoryStorage implements ITelegramStorage /*, IStateStorage */ {
return this._state.pts[entityId] ?? null return this._state.pts[entityId] ?? null
} }
getUpdatesState(): MaybeAsync<[number, number, number] | null> { getUpdatesState(): MaybeAsync<[number, number, number, number] | null> {
return this._state.gpts ?? null return this._state.gpts ?? null
} }
setUpdatesPts(val: number): MaybeAsync<void> { setUpdatesPts(val: number): MaybeAsync<void> {
if (!this._state.gpts) this._state.gpts = [0, 0, 0] if (!this._state.gpts) this._state.gpts = [0, 0, 0, 0]
this._state.gpts[0] = val this._state.gpts[0] = val
} }
setUpdatesDate(val: number): MaybeAsync<void> { setUpdatesQts(val: number): MaybeAsync<void> {
if (!this._state.gpts) this._state.gpts = [0, 0, 0] if (!this._state.gpts) this._state.gpts = [0, 0, 0, 0]
this._state.gpts[1] = val this._state.gpts[1] = val
} }
setUpdatesSeq(val: number): MaybeAsync<void> { setUpdatesDate(val: number): MaybeAsync<void> {
if (!this._state.gpts) this._state.gpts = [0, 0, 0] if (!this._state.gpts) this._state.gpts = [0, 0, 0, 0]
this._state.gpts[2] = val this._state.gpts[2] = val
} }
setUpdatesSeq(val: number): MaybeAsync<void> {
if (!this._state.gpts) this._state.gpts = [0, 0, 0, 0]
this._state.gpts[3] = val
}
getFullPeerById(id: number): tl.TypeUser | tl.TypeChat | null { getFullPeerById(id: number): tl.TypeUser | tl.TypeChat | null {
return this._cachedFull.get(id) ?? null return this._cachedFull.get(id) ?? null
} }