feat: initial support for connection states

This commit is contained in:
alina 🌸 2024-03-12 13:15:27 +03:00
parent d313743e10
commit 83746aa460
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
15 changed files with 161 additions and 9 deletions

View file

@ -727,6 +727,7 @@ on(name: string, handler: (...args: any[]) => void): this\n`)
'getPrimaryDcId', 'getPrimaryDcId',
'computeSrpParams', 'computeSrpParams',
'computeNewPasswordHash', 'computeNewPasswordHash',
'onConnectionState',
].forEach((name) => { ].forEach((name) => {
output.write( output.write(
`TelegramClient.prototype.${name} = function(...args) {\n` + `TelegramClient.prototype.${name} = function(...args) {\n` +

View file

@ -15,7 +15,7 @@ import {
writeStringSession, writeStringSession,
} from '../utils/index.js' } from '../utils/index.js'
import { LogManager } from '../utils/logger.js' import { LogManager } from '../utils/logger.js'
import { ITelegramClient } from './client.types.js' import { ConnectionState, ITelegramClient } from './client.types.js'
import { AppConfigManager } from './managers/app-config-manager.js' import { AppConfigManager } from './managers/app-config-manager.js'
import { ITelegramStorageProvider } from './storage/provider.js' import { ITelegramStorageProvider } from './storage/provider.js'
import { TelegramStorageManager, TelegramStorageManagerExtraOptions } from './storage/storage.js' import { TelegramStorageManager, TelegramStorageManagerExtraOptions } from './storage/storage.js'
@ -31,6 +31,7 @@ export interface BaseTelegramClientOptions extends MtClientOptions {
export class BaseTelegramClient implements ITelegramClient { export class BaseTelegramClient implements ITelegramClient {
readonly updates?: UpdatesManager readonly updates?: UpdatesManager
private _serverUpdatesHandler: (updates: tl.TypeUpdates) => void = () => {} private _serverUpdatesHandler: (updates: tl.TypeUpdates) => void = () => {}
private _connectionStateHandler: (state: ConnectionState) => void = () => {}
constructor(readonly params: BaseTelegramClientOptions) { constructor(readonly params: BaseTelegramClientOptions) {
if (!params.disableUpdates && params.updates !== false) { if (!params.disableUpdates && params.updates !== false) {
@ -41,6 +42,17 @@ export class BaseTelegramClient implements ITelegramClient {
this.mt.on('update', (update: tl.TypeUpdates) => { this.mt.on('update', (update: tl.TypeUpdates) => {
this._serverUpdatesHandler(update) this._serverUpdatesHandler(update)
}) })
this.mt.on('usable', () => {
this._connectionStateHandler('connected')
})
this.mt.on('wait', () => {
this._connectionStateHandler('connecting')
})
this.mt.on('networkChanged', (connected: boolean) => {
if (!connected) {
this._connectionStateHandler('offline')
}
})
} }
readonly log = this.params.logger ?? new LogManager('client') readonly log = this.params.logger ?? new LogManager('client')
@ -267,6 +279,10 @@ export class BaseTelegramClient implements ITelegramClient {
this.updates.setHandler(handler) this.updates.setHandler(handler)
} }
onConnectionState(handler: (state: ConnectionState) => void): void {
this._connectionStateHandler = handler
}
async getApiCrenetials() { async getApiCrenetials() {
return { return {
id: this.params.apiId, id: this.params.apiId,

View file

@ -5961,6 +5961,9 @@ TelegramClient.prototype.computeSrpParams = function (...args) {
TelegramClient.prototype.computeNewPasswordHash = function (...args) { TelegramClient.prototype.computeNewPasswordHash = function (...args) {
return this._client.computeNewPasswordHash(...args) return this._client.computeNewPasswordHash(...args)
} }
TelegramClient.prototype.onConnectionState = function (...args) {
return this._client.onConnectionState(...args)
}
TelegramClient.prototype.onServerUpdate = function () { TelegramClient.prototype.onServerUpdate = function () {
throw new Error('onServerUpdate is not available for TelegramClient, use .on() methods instead') throw new Error('onServerUpdate is not available for TelegramClient, use .on() methods instead')
} }

View file

@ -8,6 +8,16 @@ import type { TelegramStorageManager } from './storage/storage.js'
import type { RawUpdateHandler } from './updates/types.js' import type { RawUpdateHandler } from './updates/types.js'
import type { StringSessionData } from './utils/string-session.js' import type { StringSessionData } from './utils/string-session.js'
/**
* Connection state of the client
*
* - `offline` - not connected (only emitted when {@link ICorePlatform.onNetworkChanged} callback
* is called with `false`)
* - `connecting` - currently connecting. All requests will be queued until the connection is established
* - `connected` - connected and ready to send requests
*/
export type ConnectionState = 'offline' | 'connecting' | 'connected'
// NB: when adding new methods, don't forget to add them to: // NB: when adding new methods, don't forget to add them to:
// - worker/port.ts // - worker/port.ts
// - generate-client script // - generate-client script
@ -38,6 +48,7 @@ export interface ITelegramClient {
onServerUpdate(handler: (update: tl.TypeUpdates) => void): void onServerUpdate(handler: (update: tl.TypeUpdates) => void): void
onUpdate(handler: RawUpdateHandler): void onUpdate(handler: RawUpdateHandler): void
onConnectionState(handler: (state: ConnectionState) => void): void
getApiCrenetials(): Promise<{ id: number; hash: string }> getApiCrenetials(): Promise<{ id: number; hash: string }>
// todo - this is only used for file dl/ul, which should probably be moved // todo - this is only used for file dl/ul, which should probably be moved

View file

@ -124,7 +124,6 @@ export class UpdatesManager {
oldQts?: number oldQts?: number
oldDate?: number oldDate?: number
oldSeq?: number oldSeq?: number
selfChanged = false // todo what is this?
// whether to catch up channels from the locally stored pts // whether to catch up channels from the locally stored pts
catchUpChannels = false catchUpChannels = false

View file

@ -1,7 +1,7 @@
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
import { LogManager } from '../../utils/logger.js' import { LogManager } from '../../utils/logger.js'
import { ITelegramClient } from '../client.types.js' import { ConnectionState, ITelegramClient } from '../client.types.js'
import { PeersIndex } from '../types/peers/peers-index.js' import { PeersIndex } from '../types/peers/peers-index.js'
import { RawUpdateHandler } from '../updates/types.js' import { RawUpdateHandler } from '../updates/types.js'
import { AppConfigManagerProxy } from './app-config.js' import { AppConfigManagerProxy } from './app-config.js'
@ -38,6 +38,11 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
this._updateHandler = handler this._updateHandler = handler
} }
private _connectionStateHandler: (state: ConnectionState) => void = () => {}
onConnectionState(handler: (state: ConnectionState) => void): void {
this._connectionStateHandler = handler
}
private _onMessage: ClientMessageHandler = (message) => { private _onMessage: ClientMessageHandler = (message) => {
switch (message.type) { switch (message.type) {
case 'log': case 'log':

View file

@ -301,6 +301,8 @@ export class MtClient extends EventEmitter {
useIpv6: Boolean(params.useIpv6), useIpv6: Boolean(params.useIpv6),
enableErrorReporting: params.enableErrorReporting ?? false, enableErrorReporting: params.enableErrorReporting ?? false,
onUsable: () => this.emit('usable'), onUsable: () => this.emit('usable'),
onConnecting: () => this.emit('connecting'),
onNetworkChanged: (connected) => this.emit('networkChanged', connected),
onUpdate: (upd) => this.emit('update', upd), onUpdate: (upd) => this.emit('update', upd),
...params.network, ...params.network,
}, },

View file

@ -184,6 +184,7 @@ export class MultiSessionConnection extends EventEmitter {
}) })
}) })
conn.on('usable', () => this.emit('usable', i)) conn.on('usable', () => this.emit('usable', i))
conn.on('wait', () => this.emit('wait', i))
conn.on('request-auth', () => this.emit('request-auth', i)) conn.on('request-auth', () => this.emit('request-auth', i))
conn.on('flood-done', () => { conn.on('flood-done', () => {
this._log.debug('received flood-done from connection %d', i) this._log.debug('received flood-done from connection %d', i)
@ -297,6 +298,12 @@ export class MultiSessionConnection extends EventEmitter {
// connection is idle, we don't need to notify it // connection is idle, we don't need to notify it
} }
notifyNetworkChanged(connected: boolean): void {
for (const conn of this._connections) {
conn.notifyNetworkChanged(connected)
}
}
requestAuth(): void { requestAuth(): void {
this._connections[0]._authorize() this._connections[0]._authorize()
} }

View file

@ -59,6 +59,8 @@ export interface NetworkManagerParams {
emitError: (err: Error, connection?: SessionConnection) => void emitError: (err: Error, connection?: SessionConnection) => void
onUpdate: (upd: tl.TypeUpdates) => void onUpdate: (upd: tl.TypeUpdates) => void
onUsable: () => void onUsable: () => void
onConnecting: () => void
onNetworkChanged: (connected: boolean) => void
} }
export type ConnectionCountDelegate = (kind: ConnectionKind, dcId: number, isPremium: boolean) => number export type ConnectionCountDelegate = (kind: ConnectionKind, dcId: number, isPremium: boolean) => number
@ -490,6 +492,8 @@ export class NetworkManager {
return { main, media } return { main, media }
} }
private _resetOnNetworkChange?: () => void
private _switchPrimaryDc(dc: DcConnectionManager) { private _switchPrimaryDc(dc: DcConnectionManager) {
if (this._primaryDc && this._primaryDc !== dc) { if (this._primaryDc && this._primaryDc !== dc) {
this._primaryDc.setIsPrimary(false) this._primaryDc.setIsPrimary(false)
@ -498,9 +502,16 @@ export class NetworkManager {
this._primaryDc = dc this._primaryDc = dc
dc.setIsPrimary(true) dc.setIsPrimary(true)
this.params.onConnecting()
dc.main.on('usable', () => { dc.main.on('usable', () => {
if (dc !== this._primaryDc) return
this.params.onUsable() this.params.onUsable()
}) })
dc.main.on('wait', () => {
if (dc !== this._primaryDc) return
this.params.onConnecting()
})
dc.main.on('update', (update: tl.TypeUpdates) => { dc.main.on('update', (update: tl.TypeUpdates) => {
this._updateHandler(update, false) this._updateHandler(update, false)
}) })
@ -557,6 +568,8 @@ export class NetworkManager {
throw new MtArgumentError('DC manager already exists') throw new MtArgumentError('DC manager already exists')
} }
this._resetOnNetworkChange = getPlatform().onNetworkChanged?.(this.notifyNetworkChanged.bind(this))
const dc = new DcConnectionManager(this, defaultDcs.main.id, defaultDcs, true) const dc = new DcConnectionManager(this, defaultDcs.main.id, defaultDcs, true)
this._dcConnections.set(defaultDcs.main.id, dc) this._dcConnections.set(defaultDcs.main.id, dc)
await this._switchPrimaryDc(dc) await this._switchPrimaryDc(dc)
@ -648,6 +661,18 @@ export class NetworkManager {
this.resetSessions() this.resetSessions()
} }
notifyNetworkChanged(connected: boolean): void {
this._log.debug('network changed: %s', connected ? 'connected' : 'disconnected')
this.params.onNetworkChanged(connected)
for (const dc of this._dcConnections.values()) {
dc.main.notifyNetworkChanged(connected)
dc.upload.notifyNetworkChanged(connected)
dc.download.notifyNetworkChanged(connected)
dc.downloadSmall.notifyNetworkChanged(connected)
}
}
resetSessions(): void { resetSessions(): void {
const dc = this._primaryDc const dc = this._primaryDc
if (!dc) return if (!dc) return
@ -837,5 +862,6 @@ export class NetworkManager {
dc.destroy() dc.destroy()
} }
this.config.offReload(this._onConfigChanged) this.config.offReload(this._onConfigChanged)
this._resetOnNetworkChange?.()
} }
} }

View file

@ -34,6 +34,8 @@ export abstract class PersistentConnection extends EventEmitter {
private _consequentFails = 0 private _consequentFails = 0
private _previousWait: number | null = null private _previousWait: number | null = null
private _reconnectionTimeout: NodeJS.Timeout | null = null private _reconnectionTimeout: NodeJS.Timeout | null = null
private _shouldReconnectImmediately = false
private _disconnectedManually = true
// inactivity timeout // inactivity timeout
private _inactivityTimeout: NodeJS.Timeout | null = null private _inactivityTimeout: NodeJS.Timeout | null = null
@ -128,7 +130,14 @@ export abstract class PersistentConnection extends EventEmitter {
onTransportClose(): void { onTransportClose(): void {
// transport closed because of inactivity // transport closed because of inactivity
// obviously we dont want to reconnect then // obviously we dont want to reconnect then
if (this._inactive) return if (this._inactive || this._disconnectedManually) return
if (this._shouldReconnectImmediately) {
this._shouldReconnectImmediately = false
this.connect()
return
}
this._consequentFails += 1 this._consequentFails += 1
@ -169,6 +178,7 @@ export abstract class PersistentConnection extends EventEmitter {
if (this._reconnectionTimeout != null) { if (this._reconnectionTimeout != null) {
clearTimeout(this._reconnectionTimeout) clearTimeout(this._reconnectionTimeout)
this._reconnectionTimeout = null
} }
this._inactive = false this._inactive = false
@ -176,6 +186,24 @@ export abstract class PersistentConnection extends EventEmitter {
} }
reconnect(): void { reconnect(): void {
if (this._inactive) return
this._disconnectedManually = false
// if we are already connected
if (this.isConnected) {
this._shouldReconnectImmediately = true
this._transport.close()
return
}
// if reconnection timeout is pending, it will be cancelled in connect()
this.connect()
}
disconnectManual(): void {
this._disconnectedManually = true
this._transport.close() this._transport.close()
} }

View file

@ -5,6 +5,7 @@ import Long from 'long'
import { mtp, tl } from '@mtcute/tl' import { mtp, tl } from '@mtcute/tl'
import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlSerializationCounter, TlWriterMap } from '@mtcute/tl-runtime' import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlSerializationCounter, TlWriterMap } from '@mtcute/tl-runtime'
import { getPlatform } from '../platform.js'
import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types/index.js' import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types/index.js'
import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js' import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js'
import { reportUnknownError } from '../utils/error-reporting.js' import { reportUnknownError } from '../utils/error-reporting.js'
@ -103,6 +104,8 @@ export class SessionConnection extends PersistentConnection {
this._handleRawMessage = this._handleRawMessage.bind(this) this._handleRawMessage = this._handleRawMessage.bind(this)
} }
private _online = getPlatform().isOnline?.() ?? true
getAuthKey(temp = false): Uint8Array | null { getAuthKey(temp = false): Uint8Array | null {
const key = temp ? this._session._authKeyTemp : this._session._authKey const key = temp ? this._session._authKeyTemp : this._session._authKey
@ -1435,6 +1438,16 @@ export class SessionConnection extends PersistentConnection {
return pending.promise return pending.promise
} }
notifyNetworkChanged(online: boolean): void {
this._online = online
if (online) {
this.reconnect()
} else {
this.disconnectManual()
}
}
private _cancelRpc(rpc: PendingRpc, onTimeout = false, abortSignal?: AbortSignal): void { private _cancelRpc(rpc: PendingRpc, onTimeout = false, abortSignal?: AbortSignal): void {
if (rpc.done) return if (rpc.done) return
@ -1510,12 +1523,18 @@ export class SessionConnection extends PersistentConnection {
} }
private _flush(): void { private _flush(): void {
if (!this._session._authKey.ready || this._isPfsBindingPending || this._session.current429Timeout) { if (
!this.isConnected ||
!this._session._authKey.ready ||
this._isPfsBindingPending ||
this._session.current429Timeout
) {
this.log.debug( this.log.debug(
'skipping flush, connection is not usable (auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)', 'skipping flush, connection is not usable (connected = %b, auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)',
this.isConnected,
this._session._authKey.ready, this._session._authKey.ready,
this._isPfsBindingPending, this._isPfsBindingPending,
Boolean(this._session.current429Timeout), this._session.current429Timeout,
) )
// it will be flushed once connection is usable // it will be flushed once connection is usable

View file

@ -14,6 +14,8 @@ export interface ICorePlatform extends ITlPlatform {
fileSize?: number fileSize?: number
fileName?: string fileName?: string
} | null> } | null>
onNetworkChanged?(fn: (connected: boolean) => void): () => void
isOnline?(): boolean
} }
// eslint-disable-next-line // eslint-disable-next-line

View file

@ -41,11 +41,19 @@ export interface BaseTelegramClientOptions
* @default `"client.session"` * @default `"client.session"`
*/ */
storage?: string | ITelegramStorageProvider storage?: string | ITelegramStorageProvider
/**
* **ADVANCED USE ONLY**
*
* Whether to not set up the platform.
* This is useful if you call `setPlatform` yourself.
*/
platformless?: boolean
} }
export class BaseTelegramClient extends BaseTelegramClientBase { export class BaseTelegramClient extends BaseTelegramClientBase {
constructor(opts: BaseTelegramClientOptions) { constructor(opts: BaseTelegramClientOptions) {
setPlatform(new NodePlatform()) if (!opts.platformless) setPlatform(new NodePlatform())
super({ super({
// eslint-disable-next-line // eslint-disable-next-line

View file

@ -25,11 +25,19 @@ export interface BaseTelegramClientOptions
* @default `"client.session"` * @default `"client.session"`
*/ */
storage?: string | ITelegramStorageProvider storage?: string | ITelegramStorageProvider
/**
* **ADVANCED USE ONLY**
*
* Whether to not set up the platform.
* This is useful if you call `setPlatform` yourself.
*/
platformless?: boolean
} }
export class BaseTelegramClient extends BaseTelegramClientBase { export class BaseTelegramClient extends BaseTelegramClientBase {
constructor(opts: BaseTelegramClientOptions) { constructor(opts: BaseTelegramClientOptions) {
setPlatform(new WebPlatform()) if (!opts.platformless) setPlatform(new WebPlatform())
super({ super({
crypto: new WebCryptoProvider(), crypto: new WebCryptoProvider(),

View file

@ -29,6 +29,23 @@ export class WebPlatform implements ICorePlatform {
return null return null
} }
onNetworkChanged(fn: (connected: boolean) => void) {
if (!('onLine' in navigator)) return () => {}
const onlineHandler = () => fn(navigator.onLine)
window.addEventListener('online', onlineHandler)
window.addEventListener('offline', onlineHandler)
return () => {
window.removeEventListener('online', onlineHandler)
window.removeEventListener('offline', onlineHandler)
}
}
isOnline(): boolean {
return navigator.onLine
}
// ITlPlatform // ITlPlatform
utf8ByteLength!: typeof utf8ByteLength utf8ByteLength!: typeof utf8ByteLength
utf8Encode!: typeof utf8Encode utf8Encode!: typeof utf8Encode