feat: support for "updating" states + improved catch up a bit

This commit is contained in:
alina 🌸 2024-03-14 01:22:05 +03:00
parent 83746aa460
commit ab2425a26e
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
5 changed files with 126 additions and 93 deletions

View file

@ -37,6 +37,9 @@ export class BaseTelegramClient implements ITelegramClient {
if (!params.disableUpdates && params.updates !== false) { if (!params.disableUpdates && params.updates !== false) {
this.updates = new UpdatesManager(this, params.updates) this.updates = new UpdatesManager(this, params.updates)
this._serverUpdatesHandler = this.updates.handleUpdate.bind(this.updates) this._serverUpdatesHandler = this.updates.handleUpdate.bind(this.updates)
this.updates.onCatchingUp((catchingUp) => {
this._connectionStateHandler(catchingUp ? 'updating' : 'connected')
})
} }
this.mt.on('update', (update: tl.TypeUpdates) => { this.mt.on('update', (update: tl.TypeUpdates) => {

View file

@ -14,9 +14,13 @@ import type { StringSessionData } from './utils/string-session.js'
* - `offline` - not connected (only emitted when {@link ICorePlatform.onNetworkChanged} callback * - `offline` - not connected (only emitted when {@link ICorePlatform.onNetworkChanged} callback
* is called with `false`) * is called with `false`)
* - `connecting` - currently connecting. All requests will be queued until the connection is established * - `connecting` - currently connecting. All requests will be queued until the connection is established
* - `connected` - connected and ready to send requests * - `updating` - connected and is currently updating the state (i.e. downloading missing updates).
* At this point client is already fully operational, but some updates may be missing.
* Is only emitted when updates manager is enabled.
* - `connected` - connected and ready to send requests. When updates manager is enabled, this state
* may be emitted before `updating` state
*/ */
export type ConnectionState = 'offline' | 'connecting' | 'connected' export type ConnectionState = 'offline' | 'connecting' | 'updating' | 'connected'
// NB: when adding new methods, don't forget to add them to: // NB: when adding new methods, don't forget to add them to:
// - worker/port.ts // - worker/port.ts

View file

@ -83,6 +83,7 @@ import {
// } // }
const KEEP_ALIVE_INTERVAL = 15 * 60 * 1000 // 15 minutes const KEEP_ALIVE_INTERVAL = 15 * 60 * 1000 // 15 minutes
const UPDATES_TOO_LONG = { _: 'updatesTooLong' } as const
// todo: fix docs // todo: fix docs
export class UpdatesManager { export class UpdatesManager {
@ -126,7 +127,7 @@ export class UpdatesManager {
oldSeq?: number oldSeq?: number
// whether to catch up channels from the locally stored pts // whether to catch up channels from the locally stored pts
catchUpChannels = false catchingUp = false
catchUpOnStart = this.params.catchUp ?? false catchUpOnStart = this.params.catchUp ?? false
cpts = new Map<number, number>() cpts = new Map<number, number>()
@ -137,6 +138,8 @@ export class UpdatesManager {
log = this.client.log.create('updates') log = this.client.log.create('updates')
private _handler: RawUpdateHandler = () => {} private _handler: RawUpdateHandler = () => {}
private _onCatchingUp: (catchingUp: boolean) => void = () => {}
auth?: CurrentUserInfo | null // todo: do we need a local copy? auth?: CurrentUserInfo | null // todo: do we need a local copy?
keepAliveInterval?: NodeJS.Timeout keepAliveInterval?: NodeJS.Timeout
@ -160,15 +163,17 @@ export class UpdatesManager {
this._handler = handler this._handler = handler
} }
onCatchingUp(handler: (catchingUp: boolean) => void): void {
this._onCatchingUp = handler
}
destroy() { destroy() {
this.stopLoop() this.stopLoop()
} }
notifyLoggedIn(self: CurrentUserInfo): void { notifyLoggedIn(self: CurrentUserInfo): void {
this.auth = self this.auth = self
this._fetchUpdatesState() this.startLoop().catch((err) => this.client.emitError(err))
.then(() => this.startLoop())
.catch((err) => this.client.emitError(err))
} }
notifyLoggedOut(): void { notifyLoggedOut(): void {
@ -184,7 +189,7 @@ export class UpdatesManager {
private _onKeepAlive() { private _onKeepAlive() {
this.log.debug('no updates for >15 minutes, catching up') this.log.debug('no updates for >15 minutes, catching up')
this.handleUpdate({ _: 'updatesTooLong' }) this.handleUpdate(UPDATES_TOO_LONG)
} }
/** /**
@ -197,6 +202,7 @@ export class UpdatesManager {
* *
* > **Note**: If you are using {@link UpdatesManagerParams.catchUp} option, * > **Note**: If you are using {@link UpdatesManagerParams.catchUp} option,
* > catching up will be done in background, you can't await it. * > catching up will be done in background, you can't await it.
* > Instead, listen for the `updating` and `connected` connection state events.
*/ */
async startLoop(): Promise<void> { async startLoop(): Promise<void> {
if (this.updatesLoopActive) return if (this.updatesLoopActive) return
@ -208,6 +214,7 @@ export class UpdatesManager {
// start updates loop in background // start updates loop in background
this.updatesLoopActive = true this.updatesLoopActive = true
clearInterval(this.keepAliveInterval)
this.keepAliveInterval = setInterval(this._onKeepAlive, KEEP_ALIVE_INTERVAL) this.keepAliveInterval = setInterval(this._onKeepAlive, KEEP_ALIVE_INTERVAL)
this._loop().catch((err) => this.client.emitError(err)) this._loop().catch((err) => this.client.emitError(err))
@ -245,16 +252,23 @@ export class UpdatesManager {
/** /**
* Catch up with the server by loading missed updates. * Catch up with the server by loading missed updates.
* *.
* > **Note**: In case the storage was not properly * > **Note**: In case the storage was not properly
* > closed the last time, "catching up" might * > closed the last time, "catching up" might
* > result in duplicate updates. * > result in duplicate updates.
*/ */
catchUp(): void { catchUp(): void {
if (!this.updatesLoopActive) {
this.log.warn('catch up requested, but updates loop is not active, ignoring')
return
}
this.log.debug('catch up requested') this.log.debug('catch up requested')
this.catchUpChannels = true this._onCatchingUp(true)
this.handleUpdate({ _: 'updatesTooLong' }) this.catchingUp = true
this.handleUpdate(UPDATES_TOO_LONG)
} }
handleClientUpdate(update: tl.TypeUpdates, noDispatch = true): void { handleClientUpdate(update: tl.TypeUpdates, noDispatch = true): void {
@ -766,7 +780,7 @@ export class UpdatesManager {
let _pts: number | null | undefined = cpts.get(channelId) let _pts: number | null | undefined = cpts.get(channelId)
if (!_pts && this.catchUpChannels) { if (!_pts && this.catchingUp) {
_pts = await client.storage.updates.getChannelPts(channelId) _pts = await client.storage.updates.getChannelPts(channelId)
} }
if (!_pts) _pts = fallbackPts if (!_pts) _pts = fallbackPts
@ -936,93 +950,94 @@ export class UpdatesManager {
async _fetchDifference(requestedDiff: Map<number, Promise<void>>): Promise<void> { async _fetchDifference(requestedDiff: Map<number, Promise<void>>): Promise<void> {
const { client, log, pendingPtsUpdates, pendingUnorderedUpdates } = this const { client, log, pendingPtsUpdates, pendingUnorderedUpdates } = this
for (;;) { const diff = await client.call({
const diff = await client.call({ _: 'updates.getDifference',
_: 'updates.getDifference', pts: this.pts!,
pts: this.pts!, date: this.date!,
date: this.date!, qts: this.qts!,
qts: this.qts!, })
})
switch (diff._) { switch (diff._) {
case 'updates.differenceEmpty': case 'updates.differenceEmpty':
log.debug('updates.getDifference returned updates.differenceEmpty') log.debug('updates.getDifference returned updates.differenceEmpty')
return return
case 'updates.differenceTooLong': case 'updates.differenceTooLong':
this.pts = diff.pts this.pts = diff.pts
log.debug('updates.getDifference returned updates.differenceTooLong') log.debug('updates.getDifference returned updates.differenceTooLong')
return return
} }
const fetchedState = diff._ === 'updates.difference' ? diff.state : diff.intermediateState const fetchedState = diff._ === 'updates.difference' ? diff.state : diff.intermediateState
log.debug( log.debug(
'updates.getDifference returned %d messages, %d updates. new pts: %d, qts: %d, seq: %d, final: %b', 'updates.getDifference returned %d messages, %d updates. new pts: %d, qts: %d, seq: %d, final: %b',
diff.newMessages.length, diff.newMessages.length,
diff.otherUpdates.length, diff.otherUpdates.length,
fetchedState.pts, fetchedState.pts,
fetchedState.qts, fetchedState.qts,
fetchedState.seq, fetchedState.seq,
diff._ === 'updates.difference', diff._ === 'updates.difference',
) )
const peers = PeersIndex.from(diff) const peers = PeersIndex.from(diff)
diff.newMessages.forEach((message) => { diff.newMessages.forEach((message) => {
log.debug('processing message %d in %j (%s) from common diff', message.id, message.peerId, message._) log.debug('processing message %d in %j (%s) from common diff', message.id, message.peerId, message._)
if (message._ === 'messageEmpty') return if (message._ === 'messageEmpty') return
// pts does not need to be checked for them // pts does not need to be checked for them
pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true)) pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true))
}) })
diff.otherUpdates.forEach((upd) => {
if (upd._ === 'updateChannelTooLong') {
log.debug(
'received updateChannelTooLong for channel %d in common diff (pts = %d), fetching diff',
upd.channelId,
upd.pts,
)
this._fetchChannelDifferenceLater(requestedDiff, upd.channelId, upd.pts)
return
}
if (isMessageEmpty(upd)) return
const parsed = toPendingUpdate(upd, peers, true)
if (parsed.channelId && parsed.ptsBefore) {
// we need to check pts for these updates, put into pts queue
pendingPtsUpdates.add(parsed)
} else {
// the updates are in order already, we can treat them as unordered
pendingUnorderedUpdates.pushBack(parsed)
}
diff.otherUpdates.forEach((upd) => {
if (upd._ === 'updateChannelTooLong') {
log.debug( log.debug(
'received %s from common diff, cid: %d, pts_before: %d, pts: %d, qts_before: %d', 'received updateChannelTooLong for channel %d in common diff (pts = %d), fetching diff',
upd._, upd.channelId,
parsed.channelId, upd.pts,
parsed.ptsBefore,
parsed.pts,
parsed.qtsBefore,
) )
})
this.pts = fetchedState.pts this._fetchChannelDifferenceLater(requestedDiff, upd.channelId, upd.pts)
this.qts = fetchedState.qts
this.seq = fetchedState.seq
this.date = fetchedState.date
if (diff._ === 'updates.difference') {
return return
} }
if (isMessageEmpty(upd)) return
const parsed = toPendingUpdate(upd, peers, true)
if (parsed.channelId && parsed.ptsBefore) {
// we need to check pts for these updates, put into pts queue
pendingPtsUpdates.add(parsed)
} else {
// the updates are in order already, we can treat them as unordered
pendingUnorderedUpdates.pushBack(parsed)
}
log.debug(
'received %s from common diff, cid: %d, pts_before: %d, pts: %d, qts_before: %d',
upd._,
parsed.channelId,
parsed.ptsBefore,
parsed.pts,
parsed.qtsBefore,
)
})
this.pts = fetchedState.pts
this.qts = fetchedState.qts
this.seq = fetchedState.seq
this.date = fetchedState.date
if (diff._ === 'updates.difference') {
return
} }
// fetch the next chunk in next tick
this.handleUpdate(UPDATES_TOO_LONG)
} }
_fetchDifferenceLater(requestedDiff: Map<number, Promise<void>>): void { _fetchDifferenceLater(requestedDiff: Map<number, Promise<void>>): void {
@ -1293,6 +1308,13 @@ export class UpdatesManager {
this.hasTimedoutPostponed this.hasTimedoutPostponed
) )
) { ) {
if (this.catchingUp) {
// consider catching up completed if there are no more updates
this.log.debug('catching up completed')
this.catchingUp = false
this._onCatchingUp(false)
}
await updatesLoopCv.wait() await updatesLoopCv.wait()
} }
if (!this.updatesLoopActive) break if (!this.updatesLoopActive) break
@ -1309,7 +1331,8 @@ export class UpdatesManager {
const requestedDiff = new Map<number, Promise<void>>() const requestedDiff = new Map<number, Promise<void>>()
// first process pending containers this.log.debug('processing pending containers')
while (pendingUpdateContainers.length) { while (pendingUpdateContainers.length) {
const { upd, seqStart, seqEnd } = pendingUpdateContainers.popFront()! const { upd, seqStart, seqEnd } = pendingUpdateContainers.popFront()!
@ -1515,7 +1538,8 @@ export class UpdatesManager {
} }
} }
// process pts-ordered updates this.log.debug('processing pending pts-ordered updates')
while (pendingPtsUpdates.length) { while (pendingPtsUpdates.length) {
const pending = pendingPtsUpdates.popFront()! const pending = pendingPtsUpdates.popFront()!
const upd = pending.update const upd = pending.update
@ -1527,7 +1551,7 @@ export class UpdatesManager {
if (!pending.channelId) localPts = this.pts! if (!pending.channelId) localPts = this.pts!
else if (cpts.has(pending.channelId)) { else if (cpts.has(pending.channelId)) {
localPts = cpts.get(pending.channelId)! localPts = cpts.get(pending.channelId)!
} else if (this.catchUpChannels) { } else if (this.catchingUp) {
// only load stored channel pts in case // only load stored channel pts in case
// the user has enabled catching up. // the user has enabled catching up.
// not loading stored pts effectively disables // not loading stored pts effectively disables
@ -1616,7 +1640,8 @@ export class UpdatesManager {
await this._onUpdate(pending, requestedDiff) await this._onUpdate(pending, requestedDiff)
} }
// process postponed pts-ordered updates this.log.debug('processing postponed pts-ordered updates')
for (let item = pendingPtsUpdatesPostponed._first; item; item = item.n) { for (let item = pendingPtsUpdatesPostponed._first; item; item = item.n) {
// awesome fucking iteration because i'm so fucking tired and wanna kms // awesome fucking iteration because i'm so fucking tired and wanna kms
const pending = item.v const pending = item.v
@ -1691,7 +1716,8 @@ export class UpdatesManager {
pendingPtsUpdatesPostponed._remove(item) pendingPtsUpdatesPostponed._remove(item)
} }
// process qts-ordered updates this.log.debug('processing pending qts-ordered updates')
while (pendingQtsUpdates.length) { while (pendingQtsUpdates.length) {
const pending = pendingQtsUpdates.popFront()! const pending = pendingQtsUpdates.popFront()!
const upd = pending.update const upd = pending.update
@ -1745,7 +1771,8 @@ export class UpdatesManager {
await this._onUpdate(pending, requestedDiff) await this._onUpdate(pending, requestedDiff)
} }
// process postponed qts-ordered updates this.log.debug('processing postponed qts-ordered updates')
for (let item = pendingQtsUpdatesPostponed._first; item; item = item.n) { for (let item = pendingQtsUpdatesPostponed._first; item; item = item.n) {
// awesome fucking iteration because i'm so fucking tired and wanna kms // awesome fucking iteration because i'm so fucking tired and wanna kms
const pending = item.v const pending = item.v
@ -1796,7 +1823,6 @@ export class UpdatesManager {
this.hasTimedoutPostponed = false this.hasTimedoutPostponed = false
// wait for all pending diffs to load
while (requestedDiff.size) { while (requestedDiff.size) {
log.debug( log.debug(
'waiting for %d pending diffs before processing unordered: %J', 'waiting for %d pending diffs before processing unordered: %J',
@ -1814,7 +1840,8 @@ export class UpdatesManager {
) )
} }
// process unordered updates (or updates received from diff) this.log.debug('processing pending unordered updates')
while (pendingUnorderedUpdates.length) { while (pendingUnorderedUpdates.length) {
const pending = pendingUnorderedUpdates.popFront()! const pending = pendingUnorderedUpdates.popFront()!

View file

@ -35,7 +35,7 @@ export abstract class PersistentConnection extends EventEmitter {
private _previousWait: number | null = null private _previousWait: number | null = null
private _reconnectionTimeout: NodeJS.Timeout | null = null private _reconnectionTimeout: NodeJS.Timeout | null = null
private _shouldReconnectImmediately = false private _shouldReconnectImmediately = false
private _disconnectedManually = true private _disconnectedManually = false
// inactivity timeout // inactivity timeout
private _inactivityTimeout: NodeJS.Timeout | null = null private _inactivityTimeout: NodeJS.Timeout | null = null
@ -182,14 +182,13 @@ export abstract class PersistentConnection extends EventEmitter {
} }
this._inactive = false this._inactive = false
this._disconnectedManually = false
this._transport.connect(this.params.dc, this.params.testMode) this._transport.connect(this.params.dc, this.params.testMode)
} }
reconnect(): void { reconnect(): void {
if (this._inactive) return if (this._inactive) return
this._disconnectedManually = false
// if we are already connected // if we are already connected
if (this.isConnected) { if (this.isConnected) {
this._shouldReconnectImmediately = true this._shouldReconnectImmediately = true

View file

@ -757,7 +757,7 @@ export class SessionConnection extends PersistentConnection {
rpc.done = true rpc.done = true
this.log.verbose('<<< (%s) %j', rpc.method, result) this.log.verbose('<<< (%s:%l) %j', rpc.method, reqMsgId, result)
if (result._ === 'mt_rpc_error') { if (result._ === 'mt_rpc_error') {
const res = result as mtp.RawMt_rpc_error const res = result as mtp.RawMt_rpc_error