From 83746aa46046a0cf8781ebef87242ac3f06fb915 Mon Sep 17 00:00:00 2001 From: Alina Sireneva Date: Tue, 12 Mar 2024 13:15:27 +0300 Subject: [PATCH 1/3] feat: initial support for connection states --- packages/core/scripts/generate-client.cjs | 1 + packages/core/src/highlevel/base.ts | 18 ++++++++++- packages/core/src/highlevel/client.ts | 3 ++ packages/core/src/highlevel/client.types.ts | 11 +++++++ .../core/src/highlevel/updates/manager.ts | 1 - packages/core/src/highlevel/worker/port.ts | 7 ++++- packages/core/src/network/client.ts | 2 ++ .../src/network/multi-session-connection.ts | 7 +++++ packages/core/src/network/network-manager.ts | 26 ++++++++++++++++ .../core/src/network/persistent-connection.ts | 30 ++++++++++++++++++- .../core/src/network/session-connection.ts | 25 ++++++++++++++-- packages/core/src/platform.ts | 2 ++ packages/node/src/client.ts | 10 ++++++- packages/web/src/client.ts | 10 ++++++- packages/web/src/platform.ts | 17 +++++++++++ 15 files changed, 161 insertions(+), 9 deletions(-) diff --git a/packages/core/scripts/generate-client.cjs b/packages/core/scripts/generate-client.cjs index 4ae00832..3568a816 100644 --- a/packages/core/scripts/generate-client.cjs +++ b/packages/core/scripts/generate-client.cjs @@ -727,6 +727,7 @@ on(name: string, handler: (...args: any[]) => void): this\n`) 'getPrimaryDcId', 'computeSrpParams', 'computeNewPasswordHash', + 'onConnectionState', ].forEach((name) => { output.write( `TelegramClient.prototype.${name} = function(...args) {\n` + diff --git a/packages/core/src/highlevel/base.ts b/packages/core/src/highlevel/base.ts index 1ad415b9..e397a61a 100644 --- a/packages/core/src/highlevel/base.ts +++ b/packages/core/src/highlevel/base.ts @@ -15,7 +15,7 @@ import { writeStringSession, } from '../utils/index.js' import { LogManager } from '../utils/logger.js' -import { ITelegramClient } from './client.types.js' +import { ConnectionState, ITelegramClient } from './client.types.js' import { AppConfigManager } from './managers/app-config-manager.js' import { ITelegramStorageProvider } from './storage/provider.js' import { TelegramStorageManager, TelegramStorageManagerExtraOptions } from './storage/storage.js' @@ -31,6 +31,7 @@ export interface BaseTelegramClientOptions extends MtClientOptions { export class BaseTelegramClient implements ITelegramClient { readonly updates?: UpdatesManager private _serverUpdatesHandler: (updates: tl.TypeUpdates) => void = () => {} + private _connectionStateHandler: (state: ConnectionState) => void = () => {} constructor(readonly params: BaseTelegramClientOptions) { if (!params.disableUpdates && params.updates !== false) { @@ -41,6 +42,17 @@ export class BaseTelegramClient implements ITelegramClient { this.mt.on('update', (update: tl.TypeUpdates) => { this._serverUpdatesHandler(update) }) + this.mt.on('usable', () => { + this._connectionStateHandler('connected') + }) + this.mt.on('wait', () => { + this._connectionStateHandler('connecting') + }) + this.mt.on('networkChanged', (connected: boolean) => { + if (!connected) { + this._connectionStateHandler('offline') + } + }) } readonly log = this.params.logger ?? new LogManager('client') @@ -267,6 +279,10 @@ export class BaseTelegramClient implements ITelegramClient { this.updates.setHandler(handler) } + onConnectionState(handler: (state: ConnectionState) => void): void { + this._connectionStateHandler = handler + } + async getApiCrenetials() { return { id: this.params.apiId, diff --git a/packages/core/src/highlevel/client.ts b/packages/core/src/highlevel/client.ts index 26796845..0fe778ab 100644 --- a/packages/core/src/highlevel/client.ts +++ b/packages/core/src/highlevel/client.ts @@ -5961,6 +5961,9 @@ TelegramClient.prototype.computeSrpParams = function (...args) { TelegramClient.prototype.computeNewPasswordHash = function (...args) { return this._client.computeNewPasswordHash(...args) } +TelegramClient.prototype.onConnectionState = function (...args) { + return this._client.onConnectionState(...args) +} TelegramClient.prototype.onServerUpdate = function () { throw new Error('onServerUpdate is not available for TelegramClient, use .on() methods instead') } diff --git a/packages/core/src/highlevel/client.types.ts b/packages/core/src/highlevel/client.types.ts index fb74c6bb..24f52728 100644 --- a/packages/core/src/highlevel/client.types.ts +++ b/packages/core/src/highlevel/client.types.ts @@ -8,6 +8,16 @@ import type { TelegramStorageManager } from './storage/storage.js' import type { RawUpdateHandler } from './updates/types.js' import type { StringSessionData } from './utils/string-session.js' +/** + * Connection state of the client + * + * - `offline` - not connected (only emitted when {@link ICorePlatform.onNetworkChanged} callback + * is called with `false`) + * - `connecting` - currently connecting. All requests will be queued until the connection is established + * - `connected` - connected and ready to send requests + */ +export type ConnectionState = 'offline' | 'connecting' | 'connected' + // NB: when adding new methods, don't forget to add them to: // - worker/port.ts // - generate-client script @@ -38,6 +48,7 @@ export interface ITelegramClient { onServerUpdate(handler: (update: tl.TypeUpdates) => void): void onUpdate(handler: RawUpdateHandler): void + onConnectionState(handler: (state: ConnectionState) => void): void getApiCrenetials(): Promise<{ id: number; hash: string }> // todo - this is only used for file dl/ul, which should probably be moved diff --git a/packages/core/src/highlevel/updates/manager.ts b/packages/core/src/highlevel/updates/manager.ts index 56979d81..271b0ec1 100644 --- a/packages/core/src/highlevel/updates/manager.ts +++ b/packages/core/src/highlevel/updates/manager.ts @@ -124,7 +124,6 @@ export class UpdatesManager { oldQts?: number oldDate?: number oldSeq?: number - selfChanged = false // todo what is this? // whether to catch up channels from the locally stored pts catchUpChannels = false diff --git a/packages/core/src/highlevel/worker/port.ts b/packages/core/src/highlevel/worker/port.ts index 666899a2..c579a442 100644 --- a/packages/core/src/highlevel/worker/port.ts +++ b/packages/core/src/highlevel/worker/port.ts @@ -1,7 +1,7 @@ import { tl } from '@mtcute/tl' import { LogManager } from '../../utils/logger.js' -import { ITelegramClient } from '../client.types.js' +import { ConnectionState, ITelegramClient } from '../client.types.js' import { PeersIndex } from '../types/peers/peers-index.js' import { RawUpdateHandler } from '../updates/types.js' import { AppConfigManagerProxy } from './app-config.js' @@ -38,6 +38,11 @@ export abstract class TelegramWorkerPort imp this._updateHandler = handler } + private _connectionStateHandler: (state: ConnectionState) => void = () => {} + onConnectionState(handler: (state: ConnectionState) => void): void { + this._connectionStateHandler = handler + } + private _onMessage: ClientMessageHandler = (message) => { switch (message.type) { case 'log': diff --git a/packages/core/src/network/client.ts b/packages/core/src/network/client.ts index 6d79dd60..d58eba62 100644 --- a/packages/core/src/network/client.ts +++ b/packages/core/src/network/client.ts @@ -301,6 +301,8 @@ export class MtClient extends EventEmitter { useIpv6: Boolean(params.useIpv6), enableErrorReporting: params.enableErrorReporting ?? false, onUsable: () => this.emit('usable'), + onConnecting: () => this.emit('connecting'), + onNetworkChanged: (connected) => this.emit('networkChanged', connected), onUpdate: (upd) => this.emit('update', upd), ...params.network, }, diff --git a/packages/core/src/network/multi-session-connection.ts b/packages/core/src/network/multi-session-connection.ts index b4050ace..e301294f 100644 --- a/packages/core/src/network/multi-session-connection.ts +++ b/packages/core/src/network/multi-session-connection.ts @@ -184,6 +184,7 @@ export class MultiSessionConnection extends EventEmitter { }) }) conn.on('usable', () => this.emit('usable', i)) + conn.on('wait', () => this.emit('wait', i)) conn.on('request-auth', () => this.emit('request-auth', i)) conn.on('flood-done', () => { this._log.debug('received flood-done from connection %d', i) @@ -297,6 +298,12 @@ export class MultiSessionConnection extends EventEmitter { // connection is idle, we don't need to notify it } + notifyNetworkChanged(connected: boolean): void { + for (const conn of this._connections) { + conn.notifyNetworkChanged(connected) + } + } + requestAuth(): void { this._connections[0]._authorize() } diff --git a/packages/core/src/network/network-manager.ts b/packages/core/src/network/network-manager.ts index 95f87810..632c45d6 100644 --- a/packages/core/src/network/network-manager.ts +++ b/packages/core/src/network/network-manager.ts @@ -59,6 +59,8 @@ export interface NetworkManagerParams { emitError: (err: Error, connection?: SessionConnection) => void onUpdate: (upd: tl.TypeUpdates) => void onUsable: () => void + onConnecting: () => void + onNetworkChanged: (connected: boolean) => void } export type ConnectionCountDelegate = (kind: ConnectionKind, dcId: number, isPremium: boolean) => number @@ -490,6 +492,8 @@ export class NetworkManager { return { main, media } } + private _resetOnNetworkChange?: () => void + private _switchPrimaryDc(dc: DcConnectionManager) { if (this._primaryDc && this._primaryDc !== dc) { this._primaryDc.setIsPrimary(false) @@ -498,9 +502,16 @@ export class NetworkManager { this._primaryDc = dc dc.setIsPrimary(true) + this.params.onConnecting() + dc.main.on('usable', () => { + if (dc !== this._primaryDc) return this.params.onUsable() }) + dc.main.on('wait', () => { + if (dc !== this._primaryDc) return + this.params.onConnecting() + }) dc.main.on('update', (update: tl.TypeUpdates) => { this._updateHandler(update, false) }) @@ -557,6 +568,8 @@ export class NetworkManager { throw new MtArgumentError('DC manager already exists') } + this._resetOnNetworkChange = getPlatform().onNetworkChanged?.(this.notifyNetworkChanged.bind(this)) + const dc = new DcConnectionManager(this, defaultDcs.main.id, defaultDcs, true) this._dcConnections.set(defaultDcs.main.id, dc) await this._switchPrimaryDc(dc) @@ -648,6 +661,18 @@ export class NetworkManager { this.resetSessions() } + notifyNetworkChanged(connected: boolean): void { + this._log.debug('network changed: %s', connected ? 'connected' : 'disconnected') + this.params.onNetworkChanged(connected) + + for (const dc of this._dcConnections.values()) { + dc.main.notifyNetworkChanged(connected) + dc.upload.notifyNetworkChanged(connected) + dc.download.notifyNetworkChanged(connected) + dc.downloadSmall.notifyNetworkChanged(connected) + } + } + resetSessions(): void { const dc = this._primaryDc if (!dc) return @@ -837,5 +862,6 @@ export class NetworkManager { dc.destroy() } this.config.offReload(this._onConfigChanged) + this._resetOnNetworkChange?.() } } diff --git a/packages/core/src/network/persistent-connection.ts b/packages/core/src/network/persistent-connection.ts index fec0510e..863e6bf4 100644 --- a/packages/core/src/network/persistent-connection.ts +++ b/packages/core/src/network/persistent-connection.ts @@ -34,6 +34,8 @@ export abstract class PersistentConnection extends EventEmitter { private _consequentFails = 0 private _previousWait: number | null = null private _reconnectionTimeout: NodeJS.Timeout | null = null + private _shouldReconnectImmediately = false + private _disconnectedManually = true // inactivity timeout private _inactivityTimeout: NodeJS.Timeout | null = null @@ -128,7 +130,14 @@ export abstract class PersistentConnection extends EventEmitter { onTransportClose(): void { // transport closed because of inactivity // obviously we dont want to reconnect then - if (this._inactive) return + if (this._inactive || this._disconnectedManually) return + + if (this._shouldReconnectImmediately) { + this._shouldReconnectImmediately = false + this.connect() + + return + } this._consequentFails += 1 @@ -169,6 +178,7 @@ export abstract class PersistentConnection extends EventEmitter { if (this._reconnectionTimeout != null) { clearTimeout(this._reconnectionTimeout) + this._reconnectionTimeout = null } this._inactive = false @@ -176,6 +186,24 @@ export abstract class PersistentConnection extends EventEmitter { } reconnect(): void { + if (this._inactive) return + + this._disconnectedManually = false + + // if we are already connected + if (this.isConnected) { + this._shouldReconnectImmediately = true + this._transport.close() + + return + } + + // if reconnection timeout is pending, it will be cancelled in connect() + this.connect() + } + + disconnectManual(): void { + this._disconnectedManually = true this._transport.close() } diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index c16ab4ed..19e239b4 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -5,6 +5,7 @@ import Long from 'long' import { mtp, tl } from '@mtcute/tl' import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlSerializationCounter, TlWriterMap } from '@mtcute/tl-runtime' +import { getPlatform } from '../platform.js' import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types/index.js' import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js' import { reportUnknownError } from '../utils/error-reporting.js' @@ -103,6 +104,8 @@ export class SessionConnection extends PersistentConnection { this._handleRawMessage = this._handleRawMessage.bind(this) } + private _online = getPlatform().isOnline?.() ?? true + getAuthKey(temp = false): Uint8Array | null { const key = temp ? this._session._authKeyTemp : this._session._authKey @@ -1435,6 +1438,16 @@ export class SessionConnection extends PersistentConnection { return pending.promise } + notifyNetworkChanged(online: boolean): void { + this._online = online + + if (online) { + this.reconnect() + } else { + this.disconnectManual() + } + } + private _cancelRpc(rpc: PendingRpc, onTimeout = false, abortSignal?: AbortSignal): void { if (rpc.done) return @@ -1510,12 +1523,18 @@ export class SessionConnection extends PersistentConnection { } private _flush(): void { - if (!this._session._authKey.ready || this._isPfsBindingPending || this._session.current429Timeout) { + if ( + !this.isConnected || + !this._session._authKey.ready || + this._isPfsBindingPending || + this._session.current429Timeout + ) { this.log.debug( - 'skipping flush, connection is not usable (auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)', + 'skipping flush, connection is not usable (connected = %b, auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)', + this.isConnected, this._session._authKey.ready, this._isPfsBindingPending, - Boolean(this._session.current429Timeout), + this._session.current429Timeout, ) // it will be flushed once connection is usable diff --git a/packages/core/src/platform.ts b/packages/core/src/platform.ts index 8d5064a6..86b9dbb4 100644 --- a/packages/core/src/platform.ts +++ b/packages/core/src/platform.ts @@ -14,6 +14,8 @@ export interface ICorePlatform extends ITlPlatform { fileSize?: number fileName?: string } | null> + onNetworkChanged?(fn: (connected: boolean) => void): () => void + isOnline?(): boolean } // eslint-disable-next-line diff --git a/packages/node/src/client.ts b/packages/node/src/client.ts index 2953c7c0..fa337990 100644 --- a/packages/node/src/client.ts +++ b/packages/node/src/client.ts @@ -41,11 +41,19 @@ export interface BaseTelegramClientOptions * @default `"client.session"` */ storage?: string | ITelegramStorageProvider + + /** + * **ADVANCED USE ONLY** + * + * Whether to not set up the platform. + * This is useful if you call `setPlatform` yourself. + */ + platformless?: boolean } export class BaseTelegramClient extends BaseTelegramClientBase { constructor(opts: BaseTelegramClientOptions) { - setPlatform(new NodePlatform()) + if (!opts.platformless) setPlatform(new NodePlatform()) super({ // eslint-disable-next-line diff --git a/packages/web/src/client.ts b/packages/web/src/client.ts index 566ac30e..648181b0 100644 --- a/packages/web/src/client.ts +++ b/packages/web/src/client.ts @@ -25,11 +25,19 @@ export interface BaseTelegramClientOptions * @default `"client.session"` */ storage?: string | ITelegramStorageProvider + + /** + * **ADVANCED USE ONLY** + * + * Whether to not set up the platform. + * This is useful if you call `setPlatform` yourself. + */ + platformless?: boolean } export class BaseTelegramClient extends BaseTelegramClientBase { constructor(opts: BaseTelegramClientOptions) { - setPlatform(new WebPlatform()) + if (!opts.platformless) setPlatform(new WebPlatform()) super({ crypto: new WebCryptoProvider(), diff --git a/packages/web/src/platform.ts b/packages/web/src/platform.ts index e230866e..027c47b5 100644 --- a/packages/web/src/platform.ts +++ b/packages/web/src/platform.ts @@ -29,6 +29,23 @@ export class WebPlatform implements ICorePlatform { return null } + onNetworkChanged(fn: (connected: boolean) => void) { + if (!('onLine' in navigator)) return () => {} + + const onlineHandler = () => fn(navigator.onLine) + window.addEventListener('online', onlineHandler) + window.addEventListener('offline', onlineHandler) + + return () => { + window.removeEventListener('online', onlineHandler) + window.removeEventListener('offline', onlineHandler) + } + } + + isOnline(): boolean { + return navigator.onLine + } + // ITlPlatform utf8ByteLength!: typeof utf8ByteLength utf8Encode!: typeof utf8Encode From ab2425a26ed14caab9028bb0da5069ab462d5464 Mon Sep 17 00:00:00 2001 From: Alina Sireneva Date: Thu, 14 Mar 2024 01:22:05 +0300 Subject: [PATCH 2/3] feat: support for "updating" states + improved catch up a bit --- packages/core/src/highlevel/base.ts | 3 + packages/core/src/highlevel/client.types.ts | 8 +- .../core/src/highlevel/updates/manager.ts | 201 ++++++++++-------- .../core/src/network/persistent-connection.ts | 5 +- .../core/src/network/session-connection.ts | 2 +- 5 files changed, 126 insertions(+), 93 deletions(-) diff --git a/packages/core/src/highlevel/base.ts b/packages/core/src/highlevel/base.ts index e397a61a..abc9a84f 100644 --- a/packages/core/src/highlevel/base.ts +++ b/packages/core/src/highlevel/base.ts @@ -37,6 +37,9 @@ export class BaseTelegramClient implements ITelegramClient { if (!params.disableUpdates && params.updates !== false) { this.updates = new UpdatesManager(this, params.updates) this._serverUpdatesHandler = this.updates.handleUpdate.bind(this.updates) + this.updates.onCatchingUp((catchingUp) => { + this._connectionStateHandler(catchingUp ? 'updating' : 'connected') + }) } this.mt.on('update', (update: tl.TypeUpdates) => { diff --git a/packages/core/src/highlevel/client.types.ts b/packages/core/src/highlevel/client.types.ts index 24f52728..fd914152 100644 --- a/packages/core/src/highlevel/client.types.ts +++ b/packages/core/src/highlevel/client.types.ts @@ -14,9 +14,13 @@ import type { StringSessionData } from './utils/string-session.js' * - `offline` - not connected (only emitted when {@link ICorePlatform.onNetworkChanged} callback * is called with `false`) * - `connecting` - currently connecting. All requests will be queued until the connection is established - * - `connected` - connected and ready to send requests + * - `updating` - connected and is currently updating the state (i.e. downloading missing updates). + * At this point client is already fully operational, but some updates may be missing. + * Is only emitted when updates manager is enabled. + * - `connected` - connected and ready to send requests. When updates manager is enabled, this state + * may be emitted before `updating` state */ -export type ConnectionState = 'offline' | 'connecting' | 'connected' +export type ConnectionState = 'offline' | 'connecting' | 'updating' | 'connected' // NB: when adding new methods, don't forget to add them to: // - worker/port.ts diff --git a/packages/core/src/highlevel/updates/manager.ts b/packages/core/src/highlevel/updates/manager.ts index 271b0ec1..fc9a3e52 100644 --- a/packages/core/src/highlevel/updates/manager.ts +++ b/packages/core/src/highlevel/updates/manager.ts @@ -83,6 +83,7 @@ import { // } const KEEP_ALIVE_INTERVAL = 15 * 60 * 1000 // 15 minutes +const UPDATES_TOO_LONG = { _: 'updatesTooLong' } as const // todo: fix docs export class UpdatesManager { @@ -126,7 +127,7 @@ export class UpdatesManager { oldSeq?: number // whether to catch up channels from the locally stored pts - catchUpChannels = false + catchingUp = false catchUpOnStart = this.params.catchUp ?? false cpts = new Map() @@ -137,6 +138,8 @@ export class UpdatesManager { log = this.client.log.create('updates') private _handler: RawUpdateHandler = () => {} + private _onCatchingUp: (catchingUp: boolean) => void = () => {} + auth?: CurrentUserInfo | null // todo: do we need a local copy? keepAliveInterval?: NodeJS.Timeout @@ -160,15 +163,17 @@ export class UpdatesManager { this._handler = handler } + onCatchingUp(handler: (catchingUp: boolean) => void): void { + this._onCatchingUp = handler + } + destroy() { this.stopLoop() } notifyLoggedIn(self: CurrentUserInfo): void { this.auth = self - this._fetchUpdatesState() - .then(() => this.startLoop()) - .catch((err) => this.client.emitError(err)) + this.startLoop().catch((err) => this.client.emitError(err)) } notifyLoggedOut(): void { @@ -184,7 +189,7 @@ export class UpdatesManager { private _onKeepAlive() { this.log.debug('no updates for >15 minutes, catching up') - this.handleUpdate({ _: 'updatesTooLong' }) + this.handleUpdate(UPDATES_TOO_LONG) } /** @@ -197,6 +202,7 @@ export class UpdatesManager { * * > **Note**: If you are using {@link UpdatesManagerParams.catchUp} option, * > catching up will be done in background, you can't await it. + * > Instead, listen for the `updating` and `connected` connection state events. */ async startLoop(): Promise { if (this.updatesLoopActive) return @@ -208,6 +214,7 @@ export class UpdatesManager { // start updates loop in background this.updatesLoopActive = true + clearInterval(this.keepAliveInterval) this.keepAliveInterval = setInterval(this._onKeepAlive, KEEP_ALIVE_INTERVAL) this._loop().catch((err) => this.client.emitError(err)) @@ -245,16 +252,23 @@ export class UpdatesManager { /** * Catch up with the server by loading missed updates. - * + *. * > **Note**: In case the storage was not properly * > closed the last time, "catching up" might * > result in duplicate updates. */ catchUp(): void { + if (!this.updatesLoopActive) { + this.log.warn('catch up requested, but updates loop is not active, ignoring') + + return + } + this.log.debug('catch up requested') - this.catchUpChannels = true - this.handleUpdate({ _: 'updatesTooLong' }) + this._onCatchingUp(true) + this.catchingUp = true + this.handleUpdate(UPDATES_TOO_LONG) } handleClientUpdate(update: tl.TypeUpdates, noDispatch = true): void { @@ -766,7 +780,7 @@ export class UpdatesManager { let _pts: number | null | undefined = cpts.get(channelId) - if (!_pts && this.catchUpChannels) { + if (!_pts && this.catchingUp) { _pts = await client.storage.updates.getChannelPts(channelId) } if (!_pts) _pts = fallbackPts @@ -936,93 +950,94 @@ export class UpdatesManager { async _fetchDifference(requestedDiff: Map>): Promise { const { client, log, pendingPtsUpdates, pendingUnorderedUpdates } = this - for (;;) { - const diff = await client.call({ - _: 'updates.getDifference', - pts: this.pts!, - date: this.date!, - qts: this.qts!, - }) + const diff = await client.call({ + _: 'updates.getDifference', + pts: this.pts!, + date: this.date!, + qts: this.qts!, + }) - switch (diff._) { - case 'updates.differenceEmpty': - log.debug('updates.getDifference returned updates.differenceEmpty') + switch (diff._) { + case 'updates.differenceEmpty': + log.debug('updates.getDifference returned updates.differenceEmpty') - return - case 'updates.differenceTooLong': - this.pts = diff.pts - log.debug('updates.getDifference returned updates.differenceTooLong') + return + case 'updates.differenceTooLong': + this.pts = diff.pts + log.debug('updates.getDifference returned updates.differenceTooLong') - return - } + return + } - const fetchedState = diff._ === 'updates.difference' ? diff.state : diff.intermediateState + const fetchedState = diff._ === 'updates.difference' ? diff.state : diff.intermediateState - log.debug( - 'updates.getDifference returned %d messages, %d updates. new pts: %d, qts: %d, seq: %d, final: %b', - diff.newMessages.length, - diff.otherUpdates.length, - fetchedState.pts, - fetchedState.qts, - fetchedState.seq, - diff._ === 'updates.difference', - ) + log.debug( + 'updates.getDifference returned %d messages, %d updates. new pts: %d, qts: %d, seq: %d, final: %b', + diff.newMessages.length, + diff.otherUpdates.length, + fetchedState.pts, + fetchedState.qts, + fetchedState.seq, + diff._ === 'updates.difference', + ) - const peers = PeersIndex.from(diff) + const peers = PeersIndex.from(diff) - diff.newMessages.forEach((message) => { - log.debug('processing message %d in %j (%s) from common diff', message.id, message.peerId, message._) + diff.newMessages.forEach((message) => { + log.debug('processing message %d in %j (%s) from common diff', message.id, message.peerId, message._) - if (message._ === 'messageEmpty') return + if (message._ === 'messageEmpty') return - // pts does not need to be checked for them - pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true)) - }) - - diff.otherUpdates.forEach((upd) => { - if (upd._ === 'updateChannelTooLong') { - log.debug( - 'received updateChannelTooLong for channel %d in common diff (pts = %d), fetching diff', - upd.channelId, - upd.pts, - ) - - this._fetchChannelDifferenceLater(requestedDiff, upd.channelId, upd.pts) - - return - } - - if (isMessageEmpty(upd)) return - - const parsed = toPendingUpdate(upd, peers, true) - - if (parsed.channelId && parsed.ptsBefore) { - // we need to check pts for these updates, put into pts queue - pendingPtsUpdates.add(parsed) - } else { - // the updates are in order already, we can treat them as unordered - pendingUnorderedUpdates.pushBack(parsed) - } + // pts does not need to be checked for them + pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true)) + }) + diff.otherUpdates.forEach((upd) => { + if (upd._ === 'updateChannelTooLong') { log.debug( - 'received %s from common diff, cid: %d, pts_before: %d, pts: %d, qts_before: %d', - upd._, - parsed.channelId, - parsed.ptsBefore, - parsed.pts, - parsed.qtsBefore, + 'received updateChannelTooLong for channel %d in common diff (pts = %d), fetching diff', + upd.channelId, + upd.pts, ) - }) - this.pts = fetchedState.pts - this.qts = fetchedState.qts - this.seq = fetchedState.seq - this.date = fetchedState.date + this._fetchChannelDifferenceLater(requestedDiff, upd.channelId, upd.pts) - if (diff._ === 'updates.difference') { return } + + if (isMessageEmpty(upd)) return + + const parsed = toPendingUpdate(upd, peers, true) + + if (parsed.channelId && parsed.ptsBefore) { + // we need to check pts for these updates, put into pts queue + pendingPtsUpdates.add(parsed) + } else { + // the updates are in order already, we can treat them as unordered + pendingUnorderedUpdates.pushBack(parsed) + } + + log.debug( + 'received %s from common diff, cid: %d, pts_before: %d, pts: %d, qts_before: %d', + upd._, + parsed.channelId, + parsed.ptsBefore, + parsed.pts, + parsed.qtsBefore, + ) + }) + + this.pts = fetchedState.pts + this.qts = fetchedState.qts + this.seq = fetchedState.seq + this.date = fetchedState.date + + if (diff._ === 'updates.difference') { + return } + + // fetch the next chunk in next tick + this.handleUpdate(UPDATES_TOO_LONG) } _fetchDifferenceLater(requestedDiff: Map>): void { @@ -1293,6 +1308,13 @@ export class UpdatesManager { this.hasTimedoutPostponed ) ) { + if (this.catchingUp) { + // consider catching up completed if there are no more updates + this.log.debug('catching up completed') + this.catchingUp = false + this._onCatchingUp(false) + } + await updatesLoopCv.wait() } if (!this.updatesLoopActive) break @@ -1309,7 +1331,8 @@ export class UpdatesManager { const requestedDiff = new Map>() - // first process pending containers + this.log.debug('processing pending containers') + while (pendingUpdateContainers.length) { const { upd, seqStart, seqEnd } = pendingUpdateContainers.popFront()! @@ -1515,7 +1538,8 @@ export class UpdatesManager { } } - // process pts-ordered updates + this.log.debug('processing pending pts-ordered updates') + while (pendingPtsUpdates.length) { const pending = pendingPtsUpdates.popFront()! const upd = pending.update @@ -1527,7 +1551,7 @@ export class UpdatesManager { if (!pending.channelId) localPts = this.pts! else if (cpts.has(pending.channelId)) { localPts = cpts.get(pending.channelId)! - } else if (this.catchUpChannels) { + } else if (this.catchingUp) { // only load stored channel pts in case // the user has enabled catching up. // not loading stored pts effectively disables @@ -1616,7 +1640,8 @@ export class UpdatesManager { await this._onUpdate(pending, requestedDiff) } - // process postponed pts-ordered updates + this.log.debug('processing postponed pts-ordered updates') + for (let item = pendingPtsUpdatesPostponed._first; item; item = item.n) { // awesome fucking iteration because i'm so fucking tired and wanna kms const pending = item.v @@ -1691,7 +1716,8 @@ export class UpdatesManager { pendingPtsUpdatesPostponed._remove(item) } - // process qts-ordered updates + this.log.debug('processing pending qts-ordered updates') + while (pendingQtsUpdates.length) { const pending = pendingQtsUpdates.popFront()! const upd = pending.update @@ -1745,7 +1771,8 @@ export class UpdatesManager { await this._onUpdate(pending, requestedDiff) } - // process postponed qts-ordered updates + this.log.debug('processing postponed qts-ordered updates') + for (let item = pendingQtsUpdatesPostponed._first; item; item = item.n) { // awesome fucking iteration because i'm so fucking tired and wanna kms const pending = item.v @@ -1796,7 +1823,6 @@ export class UpdatesManager { this.hasTimedoutPostponed = false - // wait for all pending diffs to load while (requestedDiff.size) { log.debug( 'waiting for %d pending diffs before processing unordered: %J', @@ -1814,7 +1840,8 @@ export class UpdatesManager { ) } - // process unordered updates (or updates received from diff) + this.log.debug('processing pending unordered updates') + while (pendingUnorderedUpdates.length) { const pending = pendingUnorderedUpdates.popFront()! diff --git a/packages/core/src/network/persistent-connection.ts b/packages/core/src/network/persistent-connection.ts index 863e6bf4..04719ee5 100644 --- a/packages/core/src/network/persistent-connection.ts +++ b/packages/core/src/network/persistent-connection.ts @@ -35,7 +35,7 @@ export abstract class PersistentConnection extends EventEmitter { private _previousWait: number | null = null private _reconnectionTimeout: NodeJS.Timeout | null = null private _shouldReconnectImmediately = false - private _disconnectedManually = true + private _disconnectedManually = false // inactivity timeout private _inactivityTimeout: NodeJS.Timeout | null = null @@ -182,14 +182,13 @@ export abstract class PersistentConnection extends EventEmitter { } this._inactive = false + this._disconnectedManually = false this._transport.connect(this.params.dc, this.params.testMode) } reconnect(): void { if (this._inactive) return - this._disconnectedManually = false - // if we are already connected if (this.isConnected) { this._shouldReconnectImmediately = true diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index 19e239b4..4aedc34a 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -757,7 +757,7 @@ export class SessionConnection extends PersistentConnection { rpc.done = true - this.log.verbose('<<< (%s) %j', rpc.method, result) + this.log.verbose('<<< (%s:%l) %j', rpc.method, reqMsgId, result) if (result._ === 'mt_rpc_error') { const res = result as mtp.RawMt_rpc_error From c9a4558e5c962a72863b0ac76ba91d389ed02f06 Mon Sep 17 00:00:00 2001 From: Alina Sireneva Date: Fri, 15 Mar 2024 03:20:05 +0300 Subject: [PATCH 3/3] fix: download threads never connecting --- packages/core/src/network/persistent-connection.ts | 2 +- packages/core/src/network/session-connection.ts | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/core/src/network/persistent-connection.ts b/packages/core/src/network/persistent-connection.ts index 04719ee5..4c9b2cac 100644 --- a/packages/core/src/network/persistent-connection.ts +++ b/packages/core/src/network/persistent-connection.ts @@ -35,7 +35,7 @@ export abstract class PersistentConnection extends EventEmitter { private _previousWait: number | null = null private _reconnectionTimeout: NodeJS.Timeout | null = null private _shouldReconnectImmediately = false - private _disconnectedManually = false + protected _disconnectedManually = false // inactivity timeout private _inactivityTimeout: NodeJS.Timeout | null = null diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index 4aedc34a..e097eb86 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -1524,14 +1524,14 @@ export class SessionConnection extends PersistentConnection { private _flush(): void { if ( - !this.isConnected || + this._disconnectedManually || !this._session._authKey.ready || this._isPfsBindingPending || this._session.current429Timeout ) { this.log.debug( - 'skipping flush, connection is not usable (connected = %b, auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)', - this.isConnected, + 'skipping flush, connection is not usable (offline = %b, auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)', + this._disconnectedManually, this._session._authKey.ready, this._isPfsBindingPending, this._session.current429Timeout,