feat(core): support seq in updates
This commit is contained in:
parent
910361ccab
commit
2660120e8f
3 changed files with 37 additions and 38 deletions
|
@ -14,12 +14,7 @@ import { ChatsIndex, UsersIndex } from '../types'
|
|||
|
||||
const debug = require('debug')('mtcute:upds')
|
||||
|
||||
// i tried to implement updates seq, but that thing seems to be
|
||||
// broken on the server side, lol (see https://t.me/teispam/1199, ru)
|
||||
// tldr server sends multiple `updates` with the same seq, and that seq
|
||||
// is also larger than the seq in the initial updates.getState response
|
||||
|
||||
// also code in this file is very bad, thanks to Telegram's awesome updates mechanism
|
||||
// code in this file is very bad, thanks to Telegram's awesome updates mechanism
|
||||
|
||||
// @extension
|
||||
interface UpdatesState {
|
||||
|
@ -30,7 +25,8 @@ interface UpdatesState {
|
|||
// every time session is loaded & saved.
|
||||
_pts: number
|
||||
_date: number
|
||||
// _seq: number
|
||||
_seq: number
|
||||
|
||||
_cpts: Record<number, number>
|
||||
_cptsMod: Record<number, number>
|
||||
}
|
||||
|
@ -59,12 +55,12 @@ export async function _fetchUpdatesState(this: TelegramClient): Promise<void> {
|
|||
const state = await this.call({ _: 'updates.getState' })
|
||||
this._pts = state.pts
|
||||
this._date = state.date
|
||||
// this._seq = state.seq
|
||||
this._seq = state.seq
|
||||
debug(
|
||||
'loaded initial state: pts=%d, date=%d', // , seq=%d',
|
||||
'loaded initial state: pts=%d, date=%d, seq=%d',
|
||||
state.pts,
|
||||
state.date
|
||||
// state.seq
|
||||
state.date,
|
||||
state.seq
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -78,7 +74,7 @@ export async function _loadStorage(this: TelegramClient): Promise<void> {
|
|||
if (state) {
|
||||
this._pts = state[0]
|
||||
this._date = state[1]
|
||||
// this._seq = state[2]
|
||||
this._seq = state[2]
|
||||
}
|
||||
// if no state, don't bother initializing properties
|
||||
// since that means that there is no authorization,
|
||||
|
@ -100,7 +96,7 @@ export async function _saveStorage(this: TelegramClient): Promise<void> {
|
|||
try {
|
||||
// before any authorization pts will be undefined
|
||||
if (this._pts !== undefined) {
|
||||
await this.storage.setCommonPts([this._pts, this._date]) // , this._seq])
|
||||
await this.storage.setCommonPts([this._pts, this._date, this._seq])
|
||||
await this.storage.setManyChannelPts(this._cptsMod)
|
||||
this._cptsMod = {}
|
||||
}
|
||||
|
@ -422,22 +418,25 @@ export function _handleUpdate(
|
|||
break
|
||||
case 'updates':
|
||||
case 'updatesCombined': {
|
||||
// const seqStart =
|
||||
// update._ === 'updatesCombined'
|
||||
// ? update.seqStart
|
||||
// : update.seq
|
||||
// const nextLocalSeq = this._seq + 1
|
||||
//
|
||||
// debug('received %s (seq_start=%d, seq_end=%d)', update._, seqStart, update.seq)
|
||||
//
|
||||
// if (nextLocalSeq > seqStart)
|
||||
// // "the updates were already applied, and must be ignored"
|
||||
// return
|
||||
// if (nextLocalSeq < seqStart)
|
||||
// // "there's an updates gap that must be filled"
|
||||
// // loading difference will also load any updates contained
|
||||
// // in this update, so we discard it
|
||||
// return await _loadDifference.call(this)
|
||||
const seqStart =
|
||||
update._ === 'updatesCombined'
|
||||
? update.seqStart
|
||||
: update.seq
|
||||
if (seqStart !== 0) {
|
||||
// https://t.me/tdlibchat/5843
|
||||
const nextLocalSeq = this._seq + 1
|
||||
|
||||
debug('received %s (seq_start=%d, seq_end=%d)', update._, seqStart, update.seq)
|
||||
|
||||
if (nextLocalSeq > seqStart)
|
||||
// "the updates were already applied, and must be ignored"
|
||||
return
|
||||
if (nextLocalSeq < seqStart)
|
||||
// "there's an updates gap that must be filled"
|
||||
// loading difference will also load any updates contained
|
||||
// in this update, so we discard it
|
||||
return await _loadDifference.call(this)
|
||||
}
|
||||
|
||||
await this._cachePeersFrom(update)
|
||||
const { users, chats } = createUsersChatsIndex(update)
|
||||
|
@ -513,7 +512,7 @@ export function _handleUpdate(
|
|||
}
|
||||
|
||||
if (!isDummyUpdates(update)) {
|
||||
// this._seq = update.seq
|
||||
this._seq = update.seq
|
||||
this._date = update.date
|
||||
}
|
||||
break
|
||||
|
|
|
@ -103,17 +103,17 @@ export interface ITelegramStorage {
|
|||
getPeerByPhone(phone: string): MaybeAsync<tl.TypeInputPeer | null>
|
||||
|
||||
/**
|
||||
* Get common `pts` and `date` values (if available)
|
||||
* Get common `pts`, `date` and `seq` values (if available)
|
||||
*/
|
||||
getCommonPts(): MaybeAsync<[number, number] | null>
|
||||
getCommonPts(): MaybeAsync<[number, number, number] | null>
|
||||
/**
|
||||
* Get channel `pts` value
|
||||
*/
|
||||
getChannelPts(entityId: number): MaybeAsync<number | null>
|
||||
/**
|
||||
* Set common `pts` and `date` values
|
||||
* Set common `pts`, `date` and `seq` values
|
||||
*/
|
||||
setCommonPts(val: [number, number]): MaybeAsync<void>
|
||||
setCommonPts(val: [number, number, number]): MaybeAsync<void>
|
||||
/**
|
||||
* Set channels `pts` values in batch.
|
||||
* Storage is supposed to replace stored channel `pts` values
|
||||
|
|
|
@ -19,8 +19,8 @@ interface MemorySessionState {
|
|||
// username -> peer id
|
||||
usernameIndex: Record<string, number>
|
||||
|
||||
// common pts, date
|
||||
gpts: [number, number] | null
|
||||
// common pts, date, seq
|
||||
gpts: [number, number, number] | null
|
||||
// channel pts
|
||||
pts: Record<number, number>
|
||||
|
||||
|
@ -208,11 +208,11 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
return this._state.pts[entityId] ?? null
|
||||
}
|
||||
|
||||
setCommonPts(val: [number, number]): void {
|
||||
setCommonPts(val: [number, number, number]): void {
|
||||
this._state.gpts = val
|
||||
}
|
||||
|
||||
getCommonPts(): [number, number] | null {
|
||||
getCommonPts(): [number, number, number] | null {
|
||||
return this._state.gpts ?? null
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue