From 83746aa46046a0cf8781ebef87242ac3f06fb915 Mon Sep 17 00:00:00 2001 From: Alina Sireneva Date: Tue, 12 Mar 2024 13:15:27 +0300 Subject: [PATCH] feat: initial support for connection states --- packages/core/scripts/generate-client.cjs | 1 + packages/core/src/highlevel/base.ts | 18 ++++++++++- packages/core/src/highlevel/client.ts | 3 ++ packages/core/src/highlevel/client.types.ts | 11 +++++++ .../core/src/highlevel/updates/manager.ts | 1 - packages/core/src/highlevel/worker/port.ts | 7 ++++- packages/core/src/network/client.ts | 2 ++ .../src/network/multi-session-connection.ts | 7 +++++ packages/core/src/network/network-manager.ts | 26 ++++++++++++++++ .../core/src/network/persistent-connection.ts | 30 ++++++++++++++++++- .../core/src/network/session-connection.ts | 25 ++++++++++++++-- packages/core/src/platform.ts | 2 ++ packages/node/src/client.ts | 10 ++++++- packages/web/src/client.ts | 10 ++++++- packages/web/src/platform.ts | 17 +++++++++++ 15 files changed, 161 insertions(+), 9 deletions(-) diff --git a/packages/core/scripts/generate-client.cjs b/packages/core/scripts/generate-client.cjs index 4ae00832..3568a816 100644 --- a/packages/core/scripts/generate-client.cjs +++ b/packages/core/scripts/generate-client.cjs @@ -727,6 +727,7 @@ on(name: string, handler: (...args: any[]) => void): this\n`) 'getPrimaryDcId', 'computeSrpParams', 'computeNewPasswordHash', + 'onConnectionState', ].forEach((name) => { output.write( `TelegramClient.prototype.${name} = function(...args) {\n` + diff --git a/packages/core/src/highlevel/base.ts b/packages/core/src/highlevel/base.ts index 1ad415b9..e397a61a 100644 --- a/packages/core/src/highlevel/base.ts +++ b/packages/core/src/highlevel/base.ts @@ -15,7 +15,7 @@ import { writeStringSession, } from '../utils/index.js' import { LogManager } from '../utils/logger.js' -import { ITelegramClient } from './client.types.js' +import { ConnectionState, ITelegramClient } from './client.types.js' import { AppConfigManager } from './managers/app-config-manager.js' import { ITelegramStorageProvider } from './storage/provider.js' import { TelegramStorageManager, TelegramStorageManagerExtraOptions } from './storage/storage.js' @@ -31,6 +31,7 @@ export interface BaseTelegramClientOptions extends MtClientOptions { export class BaseTelegramClient implements ITelegramClient { readonly updates?: UpdatesManager private _serverUpdatesHandler: (updates: tl.TypeUpdates) => void = () => {} + private _connectionStateHandler: (state: ConnectionState) => void = () => {} constructor(readonly params: BaseTelegramClientOptions) { if (!params.disableUpdates && params.updates !== false) { @@ -41,6 +42,17 @@ export class BaseTelegramClient implements ITelegramClient { this.mt.on('update', (update: tl.TypeUpdates) => { this._serverUpdatesHandler(update) }) + this.mt.on('usable', () => { + this._connectionStateHandler('connected') + }) + this.mt.on('wait', () => { + this._connectionStateHandler('connecting') + }) + this.mt.on('networkChanged', (connected: boolean) => { + if (!connected) { + this._connectionStateHandler('offline') + } + }) } readonly log = this.params.logger ?? new LogManager('client') @@ -267,6 +279,10 @@ export class BaseTelegramClient implements ITelegramClient { this.updates.setHandler(handler) } + onConnectionState(handler: (state: ConnectionState) => void): void { + this._connectionStateHandler = handler + } + async getApiCrenetials() { return { id: this.params.apiId, diff --git a/packages/core/src/highlevel/client.ts b/packages/core/src/highlevel/client.ts index 26796845..0fe778ab 100644 --- a/packages/core/src/highlevel/client.ts +++ b/packages/core/src/highlevel/client.ts @@ -5961,6 +5961,9 @@ TelegramClient.prototype.computeSrpParams = function (...args) { TelegramClient.prototype.computeNewPasswordHash = function (...args) { return this._client.computeNewPasswordHash(...args) } +TelegramClient.prototype.onConnectionState = function (...args) { + return this._client.onConnectionState(...args) +} TelegramClient.prototype.onServerUpdate = function () { throw new Error('onServerUpdate is not available for TelegramClient, use .on() methods instead') } diff --git a/packages/core/src/highlevel/client.types.ts b/packages/core/src/highlevel/client.types.ts index fb74c6bb..24f52728 100644 --- a/packages/core/src/highlevel/client.types.ts +++ b/packages/core/src/highlevel/client.types.ts @@ -8,6 +8,16 @@ import type { TelegramStorageManager } from './storage/storage.js' import type { RawUpdateHandler } from './updates/types.js' import type { StringSessionData } from './utils/string-session.js' +/** + * Connection state of the client + * + * - `offline` - not connected (only emitted when {@link ICorePlatform.onNetworkChanged} callback + * is called with `false`) + * - `connecting` - currently connecting. All requests will be queued until the connection is established + * - `connected` - connected and ready to send requests + */ +export type ConnectionState = 'offline' | 'connecting' | 'connected' + // NB: when adding new methods, don't forget to add them to: // - worker/port.ts // - generate-client script @@ -38,6 +48,7 @@ export interface ITelegramClient { onServerUpdate(handler: (update: tl.TypeUpdates) => void): void onUpdate(handler: RawUpdateHandler): void + onConnectionState(handler: (state: ConnectionState) => void): void getApiCrenetials(): Promise<{ id: number; hash: string }> // todo - this is only used for file dl/ul, which should probably be moved diff --git a/packages/core/src/highlevel/updates/manager.ts b/packages/core/src/highlevel/updates/manager.ts index 56979d81..271b0ec1 100644 --- a/packages/core/src/highlevel/updates/manager.ts +++ b/packages/core/src/highlevel/updates/manager.ts @@ -124,7 +124,6 @@ export class UpdatesManager { oldQts?: number oldDate?: number oldSeq?: number - selfChanged = false // todo what is this? // whether to catch up channels from the locally stored pts catchUpChannels = false diff --git a/packages/core/src/highlevel/worker/port.ts b/packages/core/src/highlevel/worker/port.ts index 666899a2..c579a442 100644 --- a/packages/core/src/highlevel/worker/port.ts +++ b/packages/core/src/highlevel/worker/port.ts @@ -1,7 +1,7 @@ import { tl } from '@mtcute/tl' import { LogManager } from '../../utils/logger.js' -import { ITelegramClient } from '../client.types.js' +import { ConnectionState, ITelegramClient } from '../client.types.js' import { PeersIndex } from '../types/peers/peers-index.js' import { RawUpdateHandler } from '../updates/types.js' import { AppConfigManagerProxy } from './app-config.js' @@ -38,6 +38,11 @@ export abstract class TelegramWorkerPort imp this._updateHandler = handler } + private _connectionStateHandler: (state: ConnectionState) => void = () => {} + onConnectionState(handler: (state: ConnectionState) => void): void { + this._connectionStateHandler = handler + } + private _onMessage: ClientMessageHandler = (message) => { switch (message.type) { case 'log': diff --git a/packages/core/src/network/client.ts b/packages/core/src/network/client.ts index 6d79dd60..d58eba62 100644 --- a/packages/core/src/network/client.ts +++ b/packages/core/src/network/client.ts @@ -301,6 +301,8 @@ export class MtClient extends EventEmitter { useIpv6: Boolean(params.useIpv6), enableErrorReporting: params.enableErrorReporting ?? false, onUsable: () => this.emit('usable'), + onConnecting: () => this.emit('connecting'), + onNetworkChanged: (connected) => this.emit('networkChanged', connected), onUpdate: (upd) => this.emit('update', upd), ...params.network, }, diff --git a/packages/core/src/network/multi-session-connection.ts b/packages/core/src/network/multi-session-connection.ts index b4050ace..e301294f 100644 --- a/packages/core/src/network/multi-session-connection.ts +++ b/packages/core/src/network/multi-session-connection.ts @@ -184,6 +184,7 @@ export class MultiSessionConnection extends EventEmitter { }) }) conn.on('usable', () => this.emit('usable', i)) + conn.on('wait', () => this.emit('wait', i)) conn.on('request-auth', () => this.emit('request-auth', i)) conn.on('flood-done', () => { this._log.debug('received flood-done from connection %d', i) @@ -297,6 +298,12 @@ export class MultiSessionConnection extends EventEmitter { // connection is idle, we don't need to notify it } + notifyNetworkChanged(connected: boolean): void { + for (const conn of this._connections) { + conn.notifyNetworkChanged(connected) + } + } + requestAuth(): void { this._connections[0]._authorize() } diff --git a/packages/core/src/network/network-manager.ts b/packages/core/src/network/network-manager.ts index 95f87810..632c45d6 100644 --- a/packages/core/src/network/network-manager.ts +++ b/packages/core/src/network/network-manager.ts @@ -59,6 +59,8 @@ export interface NetworkManagerParams { emitError: (err: Error, connection?: SessionConnection) => void onUpdate: (upd: tl.TypeUpdates) => void onUsable: () => void + onConnecting: () => void + onNetworkChanged: (connected: boolean) => void } export type ConnectionCountDelegate = (kind: ConnectionKind, dcId: number, isPremium: boolean) => number @@ -490,6 +492,8 @@ export class NetworkManager { return { main, media } } + private _resetOnNetworkChange?: () => void + private _switchPrimaryDc(dc: DcConnectionManager) { if (this._primaryDc && this._primaryDc !== dc) { this._primaryDc.setIsPrimary(false) @@ -498,9 +502,16 @@ export class NetworkManager { this._primaryDc = dc dc.setIsPrimary(true) + this.params.onConnecting() + dc.main.on('usable', () => { + if (dc !== this._primaryDc) return this.params.onUsable() }) + dc.main.on('wait', () => { + if (dc !== this._primaryDc) return + this.params.onConnecting() + }) dc.main.on('update', (update: tl.TypeUpdates) => { this._updateHandler(update, false) }) @@ -557,6 +568,8 @@ export class NetworkManager { throw new MtArgumentError('DC manager already exists') } + this._resetOnNetworkChange = getPlatform().onNetworkChanged?.(this.notifyNetworkChanged.bind(this)) + const dc = new DcConnectionManager(this, defaultDcs.main.id, defaultDcs, true) this._dcConnections.set(defaultDcs.main.id, dc) await this._switchPrimaryDc(dc) @@ -648,6 +661,18 @@ export class NetworkManager { this.resetSessions() } + notifyNetworkChanged(connected: boolean): void { + this._log.debug('network changed: %s', connected ? 'connected' : 'disconnected') + this.params.onNetworkChanged(connected) + + for (const dc of this._dcConnections.values()) { + dc.main.notifyNetworkChanged(connected) + dc.upload.notifyNetworkChanged(connected) + dc.download.notifyNetworkChanged(connected) + dc.downloadSmall.notifyNetworkChanged(connected) + } + } + resetSessions(): void { const dc = this._primaryDc if (!dc) return @@ -837,5 +862,6 @@ export class NetworkManager { dc.destroy() } this.config.offReload(this._onConfigChanged) + this._resetOnNetworkChange?.() } } diff --git a/packages/core/src/network/persistent-connection.ts b/packages/core/src/network/persistent-connection.ts index fec0510e..863e6bf4 100644 --- a/packages/core/src/network/persistent-connection.ts +++ b/packages/core/src/network/persistent-connection.ts @@ -34,6 +34,8 @@ export abstract class PersistentConnection extends EventEmitter { private _consequentFails = 0 private _previousWait: number | null = null private _reconnectionTimeout: NodeJS.Timeout | null = null + private _shouldReconnectImmediately = false + private _disconnectedManually = true // inactivity timeout private _inactivityTimeout: NodeJS.Timeout | null = null @@ -128,7 +130,14 @@ export abstract class PersistentConnection extends EventEmitter { onTransportClose(): void { // transport closed because of inactivity // obviously we dont want to reconnect then - if (this._inactive) return + if (this._inactive || this._disconnectedManually) return + + if (this._shouldReconnectImmediately) { + this._shouldReconnectImmediately = false + this.connect() + + return + } this._consequentFails += 1 @@ -169,6 +178,7 @@ export abstract class PersistentConnection extends EventEmitter { if (this._reconnectionTimeout != null) { clearTimeout(this._reconnectionTimeout) + this._reconnectionTimeout = null } this._inactive = false @@ -176,6 +186,24 @@ export abstract class PersistentConnection extends EventEmitter { } reconnect(): void { + if (this._inactive) return + + this._disconnectedManually = false + + // if we are already connected + if (this.isConnected) { + this._shouldReconnectImmediately = true + this._transport.close() + + return + } + + // if reconnection timeout is pending, it will be cancelled in connect() + this.connect() + } + + disconnectManual(): void { + this._disconnectedManually = true this._transport.close() } diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index c16ab4ed..19e239b4 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -5,6 +5,7 @@ import Long from 'long' import { mtp, tl } from '@mtcute/tl' import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlSerializationCounter, TlWriterMap } from '@mtcute/tl-runtime' +import { getPlatform } from '../platform.js' import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types/index.js' import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js' import { reportUnknownError } from '../utils/error-reporting.js' @@ -103,6 +104,8 @@ export class SessionConnection extends PersistentConnection { this._handleRawMessage = this._handleRawMessage.bind(this) } + private _online = getPlatform().isOnline?.() ?? true + getAuthKey(temp = false): Uint8Array | null { const key = temp ? this._session._authKeyTemp : this._session._authKey @@ -1435,6 +1438,16 @@ export class SessionConnection extends PersistentConnection { return pending.promise } + notifyNetworkChanged(online: boolean): void { + this._online = online + + if (online) { + this.reconnect() + } else { + this.disconnectManual() + } + } + private _cancelRpc(rpc: PendingRpc, onTimeout = false, abortSignal?: AbortSignal): void { if (rpc.done) return @@ -1510,12 +1523,18 @@ export class SessionConnection extends PersistentConnection { } private _flush(): void { - if (!this._session._authKey.ready || this._isPfsBindingPending || this._session.current429Timeout) { + if ( + !this.isConnected || + !this._session._authKey.ready || + this._isPfsBindingPending || + this._session.current429Timeout + ) { this.log.debug( - 'skipping flush, connection is not usable (auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)', + 'skipping flush, connection is not usable (connected = %b, auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)', + this.isConnected, this._session._authKey.ready, this._isPfsBindingPending, - Boolean(this._session.current429Timeout), + this._session.current429Timeout, ) // it will be flushed once connection is usable diff --git a/packages/core/src/platform.ts b/packages/core/src/platform.ts index 8d5064a6..86b9dbb4 100644 --- a/packages/core/src/platform.ts +++ b/packages/core/src/platform.ts @@ -14,6 +14,8 @@ export interface ICorePlatform extends ITlPlatform { fileSize?: number fileName?: string } | null> + onNetworkChanged?(fn: (connected: boolean) => void): () => void + isOnline?(): boolean } // eslint-disable-next-line diff --git a/packages/node/src/client.ts b/packages/node/src/client.ts index 2953c7c0..fa337990 100644 --- a/packages/node/src/client.ts +++ b/packages/node/src/client.ts @@ -41,11 +41,19 @@ export interface BaseTelegramClientOptions * @default `"client.session"` */ storage?: string | ITelegramStorageProvider + + /** + * **ADVANCED USE ONLY** + * + * Whether to not set up the platform. + * This is useful if you call `setPlatform` yourself. + */ + platformless?: boolean } export class BaseTelegramClient extends BaseTelegramClientBase { constructor(opts: BaseTelegramClientOptions) { - setPlatform(new NodePlatform()) + if (!opts.platformless) setPlatform(new NodePlatform()) super({ // eslint-disable-next-line diff --git a/packages/web/src/client.ts b/packages/web/src/client.ts index 566ac30e..648181b0 100644 --- a/packages/web/src/client.ts +++ b/packages/web/src/client.ts @@ -25,11 +25,19 @@ export interface BaseTelegramClientOptions * @default `"client.session"` */ storage?: string | ITelegramStorageProvider + + /** + * **ADVANCED USE ONLY** + * + * Whether to not set up the platform. + * This is useful if you call `setPlatform` yourself. + */ + platformless?: boolean } export class BaseTelegramClient extends BaseTelegramClientBase { constructor(opts: BaseTelegramClientOptions) { - setPlatform(new WebPlatform()) + if (!opts.platformless) setPlatform(new WebPlatform()) super({ crypto: new WebCryptoProvider(), diff --git a/packages/web/src/platform.ts b/packages/web/src/platform.ts index e230866e..027c47b5 100644 --- a/packages/web/src/platform.ts +++ b/packages/web/src/platform.ts @@ -29,6 +29,23 @@ export class WebPlatform implements ICorePlatform { return null } + onNetworkChanged(fn: (connected: boolean) => void) { + if (!('onLine' in navigator)) return () => {} + + const onlineHandler = () => fn(navigator.onLine) + window.addEventListener('online', onlineHandler) + window.addEventListener('offline', onlineHandler) + + return () => { + window.removeEventListener('online', onlineHandler) + window.removeEventListener('offline', onlineHandler) + } + } + + isOnline(): boolean { + return navigator.onLine + } + // ITlPlatform utf8ByteLength!: typeof utf8ByteLength utf8Encode!: typeof utf8Encode