diff --git a/packages/core/src/highlevel/base.ts b/packages/core/src/highlevel/base.ts index e397a61a..abc9a84f 100644 --- a/packages/core/src/highlevel/base.ts +++ b/packages/core/src/highlevel/base.ts @@ -37,6 +37,9 @@ export class BaseTelegramClient implements ITelegramClient { if (!params.disableUpdates && params.updates !== false) { this.updates = new UpdatesManager(this, params.updates) this._serverUpdatesHandler = this.updates.handleUpdate.bind(this.updates) + this.updates.onCatchingUp((catchingUp) => { + this._connectionStateHandler(catchingUp ? 'updating' : 'connected') + }) } this.mt.on('update', (update: tl.TypeUpdates) => { diff --git a/packages/core/src/highlevel/client.types.ts b/packages/core/src/highlevel/client.types.ts index 24f52728..fd914152 100644 --- a/packages/core/src/highlevel/client.types.ts +++ b/packages/core/src/highlevel/client.types.ts @@ -14,9 +14,13 @@ import type { StringSessionData } from './utils/string-session.js' * - `offline` - not connected (only emitted when {@link ICorePlatform.onNetworkChanged} callback * is called with `false`) * - `connecting` - currently connecting. All requests will be queued until the connection is established - * - `connected` - connected and ready to send requests + * - `updating` - connected and is currently updating the state (i.e. downloading missing updates). + * At this point client is already fully operational, but some updates may be missing. + * Is only emitted when updates manager is enabled. + * - `connected` - connected and ready to send requests. When updates manager is enabled, this state + * may be emitted before `updating` state */ -export type ConnectionState = 'offline' | 'connecting' | 'connected' +export type ConnectionState = 'offline' | 'connecting' | 'updating' | 'connected' // NB: when adding new methods, don't forget to add them to: // - worker/port.ts diff --git a/packages/core/src/highlevel/updates/manager.ts b/packages/core/src/highlevel/updates/manager.ts index 271b0ec1..fc9a3e52 100644 --- a/packages/core/src/highlevel/updates/manager.ts +++ b/packages/core/src/highlevel/updates/manager.ts @@ -83,6 +83,7 @@ import { // } const KEEP_ALIVE_INTERVAL = 15 * 60 * 1000 // 15 minutes +const UPDATES_TOO_LONG = { _: 'updatesTooLong' } as const // todo: fix docs export class UpdatesManager { @@ -126,7 +127,7 @@ export class UpdatesManager { oldSeq?: number // whether to catch up channels from the locally stored pts - catchUpChannels = false + catchingUp = false catchUpOnStart = this.params.catchUp ?? false cpts = new Map() @@ -137,6 +138,8 @@ export class UpdatesManager { log = this.client.log.create('updates') private _handler: RawUpdateHandler = () => {} + private _onCatchingUp: (catchingUp: boolean) => void = () => {} + auth?: CurrentUserInfo | null // todo: do we need a local copy? keepAliveInterval?: NodeJS.Timeout @@ -160,15 +163,17 @@ export class UpdatesManager { this._handler = handler } + onCatchingUp(handler: (catchingUp: boolean) => void): void { + this._onCatchingUp = handler + } + destroy() { this.stopLoop() } notifyLoggedIn(self: CurrentUserInfo): void { this.auth = self - this._fetchUpdatesState() - .then(() => this.startLoop()) - .catch((err) => this.client.emitError(err)) + this.startLoop().catch((err) => this.client.emitError(err)) } notifyLoggedOut(): void { @@ -184,7 +189,7 @@ export class UpdatesManager { private _onKeepAlive() { this.log.debug('no updates for >15 minutes, catching up') - this.handleUpdate({ _: 'updatesTooLong' }) + this.handleUpdate(UPDATES_TOO_LONG) } /** @@ -197,6 +202,7 @@ export class UpdatesManager { * * > **Note**: If you are using {@link UpdatesManagerParams.catchUp} option, * > catching up will be done in background, you can't await it. + * > Instead, listen for the `updating` and `connected` connection state events. */ async startLoop(): Promise { if (this.updatesLoopActive) return @@ -208,6 +214,7 @@ export class UpdatesManager { // start updates loop in background this.updatesLoopActive = true + clearInterval(this.keepAliveInterval) this.keepAliveInterval = setInterval(this._onKeepAlive, KEEP_ALIVE_INTERVAL) this._loop().catch((err) => this.client.emitError(err)) @@ -245,16 +252,23 @@ export class UpdatesManager { /** * Catch up with the server by loading missed updates. - * + *. * > **Note**: In case the storage was not properly * > closed the last time, "catching up" might * > result in duplicate updates. */ catchUp(): void { + if (!this.updatesLoopActive) { + this.log.warn('catch up requested, but updates loop is not active, ignoring') + + return + } + this.log.debug('catch up requested') - this.catchUpChannels = true - this.handleUpdate({ _: 'updatesTooLong' }) + this._onCatchingUp(true) + this.catchingUp = true + this.handleUpdate(UPDATES_TOO_LONG) } handleClientUpdate(update: tl.TypeUpdates, noDispatch = true): void { @@ -766,7 +780,7 @@ export class UpdatesManager { let _pts: number | null | undefined = cpts.get(channelId) - if (!_pts && this.catchUpChannels) { + if (!_pts && this.catchingUp) { _pts = await client.storage.updates.getChannelPts(channelId) } if (!_pts) _pts = fallbackPts @@ -936,93 +950,94 @@ export class UpdatesManager { async _fetchDifference(requestedDiff: Map>): Promise { const { client, log, pendingPtsUpdates, pendingUnorderedUpdates } = this - for (;;) { - const diff = await client.call({ - _: 'updates.getDifference', - pts: this.pts!, - date: this.date!, - qts: this.qts!, - }) + const diff = await client.call({ + _: 'updates.getDifference', + pts: this.pts!, + date: this.date!, + qts: this.qts!, + }) - switch (diff._) { - case 'updates.differenceEmpty': - log.debug('updates.getDifference returned updates.differenceEmpty') + switch (diff._) { + case 'updates.differenceEmpty': + log.debug('updates.getDifference returned updates.differenceEmpty') - return - case 'updates.differenceTooLong': - this.pts = diff.pts - log.debug('updates.getDifference returned updates.differenceTooLong') + return + case 'updates.differenceTooLong': + this.pts = diff.pts + log.debug('updates.getDifference returned updates.differenceTooLong') - return - } + return + } - const fetchedState = diff._ === 'updates.difference' ? diff.state : diff.intermediateState + const fetchedState = diff._ === 'updates.difference' ? diff.state : diff.intermediateState - log.debug( - 'updates.getDifference returned %d messages, %d updates. new pts: %d, qts: %d, seq: %d, final: %b', - diff.newMessages.length, - diff.otherUpdates.length, - fetchedState.pts, - fetchedState.qts, - fetchedState.seq, - diff._ === 'updates.difference', - ) + log.debug( + 'updates.getDifference returned %d messages, %d updates. new pts: %d, qts: %d, seq: %d, final: %b', + diff.newMessages.length, + diff.otherUpdates.length, + fetchedState.pts, + fetchedState.qts, + fetchedState.seq, + diff._ === 'updates.difference', + ) - const peers = PeersIndex.from(diff) + const peers = PeersIndex.from(diff) - diff.newMessages.forEach((message) => { - log.debug('processing message %d in %j (%s) from common diff', message.id, message.peerId, message._) + diff.newMessages.forEach((message) => { + log.debug('processing message %d in %j (%s) from common diff', message.id, message.peerId, message._) - if (message._ === 'messageEmpty') return + if (message._ === 'messageEmpty') return - // pts does not need to be checked for them - pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true)) - }) - - diff.otherUpdates.forEach((upd) => { - if (upd._ === 'updateChannelTooLong') { - log.debug( - 'received updateChannelTooLong for channel %d in common diff (pts = %d), fetching diff', - upd.channelId, - upd.pts, - ) - - this._fetchChannelDifferenceLater(requestedDiff, upd.channelId, upd.pts) - - return - } - - if (isMessageEmpty(upd)) return - - const parsed = toPendingUpdate(upd, peers, true) - - if (parsed.channelId && parsed.ptsBefore) { - // we need to check pts for these updates, put into pts queue - pendingPtsUpdates.add(parsed) - } else { - // the updates are in order already, we can treat them as unordered - pendingUnorderedUpdates.pushBack(parsed) - } + // pts does not need to be checked for them + pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true)) + }) + diff.otherUpdates.forEach((upd) => { + if (upd._ === 'updateChannelTooLong') { log.debug( - 'received %s from common diff, cid: %d, pts_before: %d, pts: %d, qts_before: %d', - upd._, - parsed.channelId, - parsed.ptsBefore, - parsed.pts, - parsed.qtsBefore, + 'received updateChannelTooLong for channel %d in common diff (pts = %d), fetching diff', + upd.channelId, + upd.pts, ) - }) - this.pts = fetchedState.pts - this.qts = fetchedState.qts - this.seq = fetchedState.seq - this.date = fetchedState.date + this._fetchChannelDifferenceLater(requestedDiff, upd.channelId, upd.pts) - if (diff._ === 'updates.difference') { return } + + if (isMessageEmpty(upd)) return + + const parsed = toPendingUpdate(upd, peers, true) + + if (parsed.channelId && parsed.ptsBefore) { + // we need to check pts for these updates, put into pts queue + pendingPtsUpdates.add(parsed) + } else { + // the updates are in order already, we can treat them as unordered + pendingUnorderedUpdates.pushBack(parsed) + } + + log.debug( + 'received %s from common diff, cid: %d, pts_before: %d, pts: %d, qts_before: %d', + upd._, + parsed.channelId, + parsed.ptsBefore, + parsed.pts, + parsed.qtsBefore, + ) + }) + + this.pts = fetchedState.pts + this.qts = fetchedState.qts + this.seq = fetchedState.seq + this.date = fetchedState.date + + if (diff._ === 'updates.difference') { + return } + + // fetch the next chunk in next tick + this.handleUpdate(UPDATES_TOO_LONG) } _fetchDifferenceLater(requestedDiff: Map>): void { @@ -1293,6 +1308,13 @@ export class UpdatesManager { this.hasTimedoutPostponed ) ) { + if (this.catchingUp) { + // consider catching up completed if there are no more updates + this.log.debug('catching up completed') + this.catchingUp = false + this._onCatchingUp(false) + } + await updatesLoopCv.wait() } if (!this.updatesLoopActive) break @@ -1309,7 +1331,8 @@ export class UpdatesManager { const requestedDiff = new Map>() - // first process pending containers + this.log.debug('processing pending containers') + while (pendingUpdateContainers.length) { const { upd, seqStart, seqEnd } = pendingUpdateContainers.popFront()! @@ -1515,7 +1538,8 @@ export class UpdatesManager { } } - // process pts-ordered updates + this.log.debug('processing pending pts-ordered updates') + while (pendingPtsUpdates.length) { const pending = pendingPtsUpdates.popFront()! const upd = pending.update @@ -1527,7 +1551,7 @@ export class UpdatesManager { if (!pending.channelId) localPts = this.pts! else if (cpts.has(pending.channelId)) { localPts = cpts.get(pending.channelId)! - } else if (this.catchUpChannels) { + } else if (this.catchingUp) { // only load stored channel pts in case // the user has enabled catching up. // not loading stored pts effectively disables @@ -1616,7 +1640,8 @@ export class UpdatesManager { await this._onUpdate(pending, requestedDiff) } - // process postponed pts-ordered updates + this.log.debug('processing postponed pts-ordered updates') + for (let item = pendingPtsUpdatesPostponed._first; item; item = item.n) { // awesome fucking iteration because i'm so fucking tired and wanna kms const pending = item.v @@ -1691,7 +1716,8 @@ export class UpdatesManager { pendingPtsUpdatesPostponed._remove(item) } - // process qts-ordered updates + this.log.debug('processing pending qts-ordered updates') + while (pendingQtsUpdates.length) { const pending = pendingQtsUpdates.popFront()! const upd = pending.update @@ -1745,7 +1771,8 @@ export class UpdatesManager { await this._onUpdate(pending, requestedDiff) } - // process postponed qts-ordered updates + this.log.debug('processing postponed qts-ordered updates') + for (let item = pendingQtsUpdatesPostponed._first; item; item = item.n) { // awesome fucking iteration because i'm so fucking tired and wanna kms const pending = item.v @@ -1796,7 +1823,6 @@ export class UpdatesManager { this.hasTimedoutPostponed = false - // wait for all pending diffs to load while (requestedDiff.size) { log.debug( 'waiting for %d pending diffs before processing unordered: %J', @@ -1814,7 +1840,8 @@ export class UpdatesManager { ) } - // process unordered updates (or updates received from diff) + this.log.debug('processing pending unordered updates') + while (pendingUnorderedUpdates.length) { const pending = pendingUnorderedUpdates.popFront()! diff --git a/packages/core/src/network/persistent-connection.ts b/packages/core/src/network/persistent-connection.ts index 863e6bf4..04719ee5 100644 --- a/packages/core/src/network/persistent-connection.ts +++ b/packages/core/src/network/persistent-connection.ts @@ -35,7 +35,7 @@ export abstract class PersistentConnection extends EventEmitter { private _previousWait: number | null = null private _reconnectionTimeout: NodeJS.Timeout | null = null private _shouldReconnectImmediately = false - private _disconnectedManually = true + private _disconnectedManually = false // inactivity timeout private _inactivityTimeout: NodeJS.Timeout | null = null @@ -182,14 +182,13 @@ export abstract class PersistentConnection extends EventEmitter { } this._inactive = false + this._disconnectedManually = false this._transport.connect(this.params.dc, this.params.testMode) } reconnect(): void { if (this._inactive) return - this._disconnectedManually = false - // if we are already connected if (this.isConnected) { this._shouldReconnectImmediately = true diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index 19e239b4..4aedc34a 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -757,7 +757,7 @@ export class SessionConnection extends PersistentConnection { rpc.done = true - this.log.verbose('<<< (%s) %j', rpc.method, result) + this.log.verbose('<<< (%s:%l) %j', rpc.method, reqMsgId, result) if (result._ === 'mt_rpc_error') { const res = result as mtp.RawMt_rpc_error