Connection state (#22)
This commit is contained in:
commit
3ef585efb2
15 changed files with 282 additions and 97 deletions
|
@ -727,6 +727,7 @@ on(name: string, handler: (...args: any[]) => void): this\n`)
|
||||||
'getPrimaryDcId',
|
'getPrimaryDcId',
|
||||||
'computeSrpParams',
|
'computeSrpParams',
|
||||||
'computeNewPasswordHash',
|
'computeNewPasswordHash',
|
||||||
|
'onConnectionState',
|
||||||
].forEach((name) => {
|
].forEach((name) => {
|
||||||
output.write(
|
output.write(
|
||||||
`TelegramClient.prototype.${name} = function(...args) {\n` +
|
`TelegramClient.prototype.${name} = function(...args) {\n` +
|
||||||
|
|
|
@ -15,7 +15,7 @@ import {
|
||||||
writeStringSession,
|
writeStringSession,
|
||||||
} from '../utils/index.js'
|
} from '../utils/index.js'
|
||||||
import { LogManager } from '../utils/logger.js'
|
import { LogManager } from '../utils/logger.js'
|
||||||
import { ITelegramClient } from './client.types.js'
|
import { ConnectionState, ITelegramClient } from './client.types.js'
|
||||||
import { AppConfigManager } from './managers/app-config-manager.js'
|
import { AppConfigManager } from './managers/app-config-manager.js'
|
||||||
import { ITelegramStorageProvider } from './storage/provider.js'
|
import { ITelegramStorageProvider } from './storage/provider.js'
|
||||||
import { TelegramStorageManager, TelegramStorageManagerExtraOptions } from './storage/storage.js'
|
import { TelegramStorageManager, TelegramStorageManagerExtraOptions } from './storage/storage.js'
|
||||||
|
@ -31,16 +31,31 @@ export interface BaseTelegramClientOptions extends MtClientOptions {
|
||||||
export class BaseTelegramClient implements ITelegramClient {
|
export class BaseTelegramClient implements ITelegramClient {
|
||||||
readonly updates?: UpdatesManager
|
readonly updates?: UpdatesManager
|
||||||
private _serverUpdatesHandler: (updates: tl.TypeUpdates) => void = () => {}
|
private _serverUpdatesHandler: (updates: tl.TypeUpdates) => void = () => {}
|
||||||
|
private _connectionStateHandler: (state: ConnectionState) => void = () => {}
|
||||||
|
|
||||||
constructor(readonly params: BaseTelegramClientOptions) {
|
constructor(readonly params: BaseTelegramClientOptions) {
|
||||||
if (!params.disableUpdates && params.updates !== false) {
|
if (!params.disableUpdates && params.updates !== false) {
|
||||||
this.updates = new UpdatesManager(this, params.updates)
|
this.updates = new UpdatesManager(this, params.updates)
|
||||||
this._serverUpdatesHandler = this.updates.handleUpdate.bind(this.updates)
|
this._serverUpdatesHandler = this.updates.handleUpdate.bind(this.updates)
|
||||||
|
this.updates.onCatchingUp((catchingUp) => {
|
||||||
|
this._connectionStateHandler(catchingUp ? 'updating' : 'connected')
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
this.mt.on('update', (update: tl.TypeUpdates) => {
|
this.mt.on('update', (update: tl.TypeUpdates) => {
|
||||||
this._serverUpdatesHandler(update)
|
this._serverUpdatesHandler(update)
|
||||||
})
|
})
|
||||||
|
this.mt.on('usable', () => {
|
||||||
|
this._connectionStateHandler('connected')
|
||||||
|
})
|
||||||
|
this.mt.on('wait', () => {
|
||||||
|
this._connectionStateHandler('connecting')
|
||||||
|
})
|
||||||
|
this.mt.on('networkChanged', (connected: boolean) => {
|
||||||
|
if (!connected) {
|
||||||
|
this._connectionStateHandler('offline')
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
readonly log = this.params.logger ?? new LogManager('client')
|
readonly log = this.params.logger ?? new LogManager('client')
|
||||||
|
@ -267,6 +282,10 @@ export class BaseTelegramClient implements ITelegramClient {
|
||||||
this.updates.setHandler(handler)
|
this.updates.setHandler(handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
onConnectionState(handler: (state: ConnectionState) => void): void {
|
||||||
|
this._connectionStateHandler = handler
|
||||||
|
}
|
||||||
|
|
||||||
async getApiCrenetials() {
|
async getApiCrenetials() {
|
||||||
return {
|
return {
|
||||||
id: this.params.apiId,
|
id: this.params.apiId,
|
||||||
|
|
|
@ -5961,6 +5961,9 @@ TelegramClient.prototype.computeSrpParams = function (...args) {
|
||||||
TelegramClient.prototype.computeNewPasswordHash = function (...args) {
|
TelegramClient.prototype.computeNewPasswordHash = function (...args) {
|
||||||
return this._client.computeNewPasswordHash(...args)
|
return this._client.computeNewPasswordHash(...args)
|
||||||
}
|
}
|
||||||
|
TelegramClient.prototype.onConnectionState = function (...args) {
|
||||||
|
return this._client.onConnectionState(...args)
|
||||||
|
}
|
||||||
TelegramClient.prototype.onServerUpdate = function () {
|
TelegramClient.prototype.onServerUpdate = function () {
|
||||||
throw new Error('onServerUpdate is not available for TelegramClient, use .on() methods instead')
|
throw new Error('onServerUpdate is not available for TelegramClient, use .on() methods instead')
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,20 @@ import type { TelegramStorageManager } from './storage/storage.js'
|
||||||
import type { RawUpdateHandler } from './updates/types.js'
|
import type { RawUpdateHandler } from './updates/types.js'
|
||||||
import type { StringSessionData } from './utils/string-session.js'
|
import type { StringSessionData } from './utils/string-session.js'
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connection state of the client
|
||||||
|
*
|
||||||
|
* - `offline` - not connected (only emitted when {@link ICorePlatform.onNetworkChanged} callback
|
||||||
|
* is called with `false`)
|
||||||
|
* - `connecting` - currently connecting. All requests will be queued until the connection is established
|
||||||
|
* - `updating` - connected and is currently updating the state (i.e. downloading missing updates).
|
||||||
|
* At this point client is already fully operational, but some updates may be missing.
|
||||||
|
* Is only emitted when updates manager is enabled.
|
||||||
|
* - `connected` - connected and ready to send requests. When updates manager is enabled, this state
|
||||||
|
* may be emitted before `updating` state
|
||||||
|
*/
|
||||||
|
export type ConnectionState = 'offline' | 'connecting' | 'updating' | 'connected'
|
||||||
|
|
||||||
// NB: when adding new methods, don't forget to add them to:
|
// NB: when adding new methods, don't forget to add them to:
|
||||||
// - worker/port.ts
|
// - worker/port.ts
|
||||||
// - generate-client script
|
// - generate-client script
|
||||||
|
@ -38,6 +52,7 @@ export interface ITelegramClient {
|
||||||
|
|
||||||
onServerUpdate(handler: (update: tl.TypeUpdates) => void): void
|
onServerUpdate(handler: (update: tl.TypeUpdates) => void): void
|
||||||
onUpdate(handler: RawUpdateHandler): void
|
onUpdate(handler: RawUpdateHandler): void
|
||||||
|
onConnectionState(handler: (state: ConnectionState) => void): void
|
||||||
|
|
||||||
getApiCrenetials(): Promise<{ id: number; hash: string }>
|
getApiCrenetials(): Promise<{ id: number; hash: string }>
|
||||||
// todo - this is only used for file dl/ul, which should probably be moved
|
// todo - this is only used for file dl/ul, which should probably be moved
|
||||||
|
|
|
@ -83,6 +83,7 @@ import {
|
||||||
// }
|
// }
|
||||||
|
|
||||||
const KEEP_ALIVE_INTERVAL = 15 * 60 * 1000 // 15 minutes
|
const KEEP_ALIVE_INTERVAL = 15 * 60 * 1000 // 15 minutes
|
||||||
|
const UPDATES_TOO_LONG = { _: 'updatesTooLong' } as const
|
||||||
|
|
||||||
// todo: fix docs
|
// todo: fix docs
|
||||||
export class UpdatesManager {
|
export class UpdatesManager {
|
||||||
|
@ -124,10 +125,9 @@ export class UpdatesManager {
|
||||||
oldQts?: number
|
oldQts?: number
|
||||||
oldDate?: number
|
oldDate?: number
|
||||||
oldSeq?: number
|
oldSeq?: number
|
||||||
selfChanged = false // todo what is this?
|
|
||||||
|
|
||||||
// whether to catch up channels from the locally stored pts
|
// whether to catch up channels from the locally stored pts
|
||||||
catchUpChannels = false
|
catchingUp = false
|
||||||
catchUpOnStart = this.params.catchUp ?? false
|
catchUpOnStart = this.params.catchUp ?? false
|
||||||
|
|
||||||
cpts = new Map<number, number>()
|
cpts = new Map<number, number>()
|
||||||
|
@ -138,6 +138,8 @@ export class UpdatesManager {
|
||||||
log = this.client.log.create('updates')
|
log = this.client.log.create('updates')
|
||||||
private _handler: RawUpdateHandler = () => {}
|
private _handler: RawUpdateHandler = () => {}
|
||||||
|
|
||||||
|
private _onCatchingUp: (catchingUp: boolean) => void = () => {}
|
||||||
|
|
||||||
auth?: CurrentUserInfo | null // todo: do we need a local copy?
|
auth?: CurrentUserInfo | null // todo: do we need a local copy?
|
||||||
keepAliveInterval?: NodeJS.Timeout
|
keepAliveInterval?: NodeJS.Timeout
|
||||||
|
|
||||||
|
@ -161,15 +163,17 @@ export class UpdatesManager {
|
||||||
this._handler = handler
|
this._handler = handler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
onCatchingUp(handler: (catchingUp: boolean) => void): void {
|
||||||
|
this._onCatchingUp = handler
|
||||||
|
}
|
||||||
|
|
||||||
destroy() {
|
destroy() {
|
||||||
this.stopLoop()
|
this.stopLoop()
|
||||||
}
|
}
|
||||||
|
|
||||||
notifyLoggedIn(self: CurrentUserInfo): void {
|
notifyLoggedIn(self: CurrentUserInfo): void {
|
||||||
this.auth = self
|
this.auth = self
|
||||||
this._fetchUpdatesState()
|
this.startLoop().catch((err) => this.client.emitError(err))
|
||||||
.then(() => this.startLoop())
|
|
||||||
.catch((err) => this.client.emitError(err))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
notifyLoggedOut(): void {
|
notifyLoggedOut(): void {
|
||||||
|
@ -185,7 +189,7 @@ export class UpdatesManager {
|
||||||
|
|
||||||
private _onKeepAlive() {
|
private _onKeepAlive() {
|
||||||
this.log.debug('no updates for >15 minutes, catching up')
|
this.log.debug('no updates for >15 minutes, catching up')
|
||||||
this.handleUpdate({ _: 'updatesTooLong' })
|
this.handleUpdate(UPDATES_TOO_LONG)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -198,6 +202,7 @@ export class UpdatesManager {
|
||||||
*
|
*
|
||||||
* > **Note**: If you are using {@link UpdatesManagerParams.catchUp} option,
|
* > **Note**: If you are using {@link UpdatesManagerParams.catchUp} option,
|
||||||
* > catching up will be done in background, you can't await it.
|
* > catching up will be done in background, you can't await it.
|
||||||
|
* > Instead, listen for the `updating` and `connected` connection state events.
|
||||||
*/
|
*/
|
||||||
async startLoop(): Promise<void> {
|
async startLoop(): Promise<void> {
|
||||||
if (this.updatesLoopActive) return
|
if (this.updatesLoopActive) return
|
||||||
|
@ -209,6 +214,7 @@ export class UpdatesManager {
|
||||||
|
|
||||||
// start updates loop in background
|
// start updates loop in background
|
||||||
this.updatesLoopActive = true
|
this.updatesLoopActive = true
|
||||||
|
clearInterval(this.keepAliveInterval)
|
||||||
this.keepAliveInterval = setInterval(this._onKeepAlive, KEEP_ALIVE_INTERVAL)
|
this.keepAliveInterval = setInterval(this._onKeepAlive, KEEP_ALIVE_INTERVAL)
|
||||||
this._loop().catch((err) => this.client.emitError(err))
|
this._loop().catch((err) => this.client.emitError(err))
|
||||||
|
|
||||||
|
@ -246,16 +252,23 @@ export class UpdatesManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Catch up with the server by loading missed updates.
|
* Catch up with the server by loading missed updates.
|
||||||
*
|
*.
|
||||||
* > **Note**: In case the storage was not properly
|
* > **Note**: In case the storage was not properly
|
||||||
* > closed the last time, "catching up" might
|
* > closed the last time, "catching up" might
|
||||||
* > result in duplicate updates.
|
* > result in duplicate updates.
|
||||||
*/
|
*/
|
||||||
catchUp(): void {
|
catchUp(): void {
|
||||||
|
if (!this.updatesLoopActive) {
|
||||||
|
this.log.warn('catch up requested, but updates loop is not active, ignoring')
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
this.log.debug('catch up requested')
|
this.log.debug('catch up requested')
|
||||||
|
|
||||||
this.catchUpChannels = true
|
this._onCatchingUp(true)
|
||||||
this.handleUpdate({ _: 'updatesTooLong' })
|
this.catchingUp = true
|
||||||
|
this.handleUpdate(UPDATES_TOO_LONG)
|
||||||
}
|
}
|
||||||
|
|
||||||
handleClientUpdate(update: tl.TypeUpdates, noDispatch = true): void {
|
handleClientUpdate(update: tl.TypeUpdates, noDispatch = true): void {
|
||||||
|
@ -767,7 +780,7 @@ export class UpdatesManager {
|
||||||
|
|
||||||
let _pts: number | null | undefined = cpts.get(channelId)
|
let _pts: number | null | undefined = cpts.get(channelId)
|
||||||
|
|
||||||
if (!_pts && this.catchUpChannels) {
|
if (!_pts && this.catchingUp) {
|
||||||
_pts = await client.storage.updates.getChannelPts(channelId)
|
_pts = await client.storage.updates.getChannelPts(channelId)
|
||||||
}
|
}
|
||||||
if (!_pts) _pts = fallbackPts
|
if (!_pts) _pts = fallbackPts
|
||||||
|
@ -937,93 +950,94 @@ export class UpdatesManager {
|
||||||
async _fetchDifference(requestedDiff: Map<number, Promise<void>>): Promise<void> {
|
async _fetchDifference(requestedDiff: Map<number, Promise<void>>): Promise<void> {
|
||||||
const { client, log, pendingPtsUpdates, pendingUnorderedUpdates } = this
|
const { client, log, pendingPtsUpdates, pendingUnorderedUpdates } = this
|
||||||
|
|
||||||
for (;;) {
|
const diff = await client.call({
|
||||||
const diff = await client.call({
|
_: 'updates.getDifference',
|
||||||
_: 'updates.getDifference',
|
pts: this.pts!,
|
||||||
pts: this.pts!,
|
date: this.date!,
|
||||||
date: this.date!,
|
qts: this.qts!,
|
||||||
qts: this.qts!,
|
})
|
||||||
})
|
|
||||||
|
|
||||||
switch (diff._) {
|
switch (diff._) {
|
||||||
case 'updates.differenceEmpty':
|
case 'updates.differenceEmpty':
|
||||||
log.debug('updates.getDifference returned updates.differenceEmpty')
|
log.debug('updates.getDifference returned updates.differenceEmpty')
|
||||||
|
|
||||||
return
|
return
|
||||||
case 'updates.differenceTooLong':
|
case 'updates.differenceTooLong':
|
||||||
this.pts = diff.pts
|
this.pts = diff.pts
|
||||||
log.debug('updates.getDifference returned updates.differenceTooLong')
|
log.debug('updates.getDifference returned updates.differenceTooLong')
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
const fetchedState = diff._ === 'updates.difference' ? diff.state : diff.intermediateState
|
const fetchedState = diff._ === 'updates.difference' ? diff.state : diff.intermediateState
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
'updates.getDifference returned %d messages, %d updates. new pts: %d, qts: %d, seq: %d, final: %b',
|
'updates.getDifference returned %d messages, %d updates. new pts: %d, qts: %d, seq: %d, final: %b',
|
||||||
diff.newMessages.length,
|
diff.newMessages.length,
|
||||||
diff.otherUpdates.length,
|
diff.otherUpdates.length,
|
||||||
fetchedState.pts,
|
fetchedState.pts,
|
||||||
fetchedState.qts,
|
fetchedState.qts,
|
||||||
fetchedState.seq,
|
fetchedState.seq,
|
||||||
diff._ === 'updates.difference',
|
diff._ === 'updates.difference',
|
||||||
)
|
)
|
||||||
|
|
||||||
const peers = PeersIndex.from(diff)
|
const peers = PeersIndex.from(diff)
|
||||||
|
|
||||||
diff.newMessages.forEach((message) => {
|
diff.newMessages.forEach((message) => {
|
||||||
log.debug('processing message %d in %j (%s) from common diff', message.id, message.peerId, message._)
|
log.debug('processing message %d in %j (%s) from common diff', message.id, message.peerId, message._)
|
||||||
|
|
||||||
if (message._ === 'messageEmpty') return
|
if (message._ === 'messageEmpty') return
|
||||||
|
|
||||||
// pts does not need to be checked for them
|
// pts does not need to be checked for them
|
||||||
pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true))
|
pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true))
|
||||||
})
|
})
|
||||||
|
|
||||||
diff.otherUpdates.forEach((upd) => {
|
|
||||||
if (upd._ === 'updateChannelTooLong') {
|
|
||||||
log.debug(
|
|
||||||
'received updateChannelTooLong for channel %d in common diff (pts = %d), fetching diff',
|
|
||||||
upd.channelId,
|
|
||||||
upd.pts,
|
|
||||||
)
|
|
||||||
|
|
||||||
this._fetchChannelDifferenceLater(requestedDiff, upd.channelId, upd.pts)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isMessageEmpty(upd)) return
|
|
||||||
|
|
||||||
const parsed = toPendingUpdate(upd, peers, true)
|
|
||||||
|
|
||||||
if (parsed.channelId && parsed.ptsBefore) {
|
|
||||||
// we need to check pts for these updates, put into pts queue
|
|
||||||
pendingPtsUpdates.add(parsed)
|
|
||||||
} else {
|
|
||||||
// the updates are in order already, we can treat them as unordered
|
|
||||||
pendingUnorderedUpdates.pushBack(parsed)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
diff.otherUpdates.forEach((upd) => {
|
||||||
|
if (upd._ === 'updateChannelTooLong') {
|
||||||
log.debug(
|
log.debug(
|
||||||
'received %s from common diff, cid: %d, pts_before: %d, pts: %d, qts_before: %d',
|
'received updateChannelTooLong for channel %d in common diff (pts = %d), fetching diff',
|
||||||
upd._,
|
upd.channelId,
|
||||||
parsed.channelId,
|
upd.pts,
|
||||||
parsed.ptsBefore,
|
|
||||||
parsed.pts,
|
|
||||||
parsed.qtsBefore,
|
|
||||||
)
|
)
|
||||||
})
|
|
||||||
|
|
||||||
this.pts = fetchedState.pts
|
this._fetchChannelDifferenceLater(requestedDiff, upd.channelId, upd.pts)
|
||||||
this.qts = fetchedState.qts
|
|
||||||
this.seq = fetchedState.seq
|
|
||||||
this.date = fetchedState.date
|
|
||||||
|
|
||||||
if (diff._ === 'updates.difference') {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isMessageEmpty(upd)) return
|
||||||
|
|
||||||
|
const parsed = toPendingUpdate(upd, peers, true)
|
||||||
|
|
||||||
|
if (parsed.channelId && parsed.ptsBefore) {
|
||||||
|
// we need to check pts for these updates, put into pts queue
|
||||||
|
pendingPtsUpdates.add(parsed)
|
||||||
|
} else {
|
||||||
|
// the updates are in order already, we can treat them as unordered
|
||||||
|
pendingUnorderedUpdates.pushBack(parsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.debug(
|
||||||
|
'received %s from common diff, cid: %d, pts_before: %d, pts: %d, qts_before: %d',
|
||||||
|
upd._,
|
||||||
|
parsed.channelId,
|
||||||
|
parsed.ptsBefore,
|
||||||
|
parsed.pts,
|
||||||
|
parsed.qtsBefore,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.pts = fetchedState.pts
|
||||||
|
this.qts = fetchedState.qts
|
||||||
|
this.seq = fetchedState.seq
|
||||||
|
this.date = fetchedState.date
|
||||||
|
|
||||||
|
if (diff._ === 'updates.difference') {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fetch the next chunk in next tick
|
||||||
|
this.handleUpdate(UPDATES_TOO_LONG)
|
||||||
}
|
}
|
||||||
|
|
||||||
_fetchDifferenceLater(requestedDiff: Map<number, Promise<void>>): void {
|
_fetchDifferenceLater(requestedDiff: Map<number, Promise<void>>): void {
|
||||||
|
@ -1294,6 +1308,13 @@ export class UpdatesManager {
|
||||||
this.hasTimedoutPostponed
|
this.hasTimedoutPostponed
|
||||||
)
|
)
|
||||||
) {
|
) {
|
||||||
|
if (this.catchingUp) {
|
||||||
|
// consider catching up completed if there are no more updates
|
||||||
|
this.log.debug('catching up completed')
|
||||||
|
this.catchingUp = false
|
||||||
|
this._onCatchingUp(false)
|
||||||
|
}
|
||||||
|
|
||||||
await updatesLoopCv.wait()
|
await updatesLoopCv.wait()
|
||||||
}
|
}
|
||||||
if (!this.updatesLoopActive) break
|
if (!this.updatesLoopActive) break
|
||||||
|
@ -1310,7 +1331,8 @@ export class UpdatesManager {
|
||||||
|
|
||||||
const requestedDiff = new Map<number, Promise<void>>()
|
const requestedDiff = new Map<number, Promise<void>>()
|
||||||
|
|
||||||
// first process pending containers
|
this.log.debug('processing pending containers')
|
||||||
|
|
||||||
while (pendingUpdateContainers.length) {
|
while (pendingUpdateContainers.length) {
|
||||||
const { upd, seqStart, seqEnd } = pendingUpdateContainers.popFront()!
|
const { upd, seqStart, seqEnd } = pendingUpdateContainers.popFront()!
|
||||||
|
|
||||||
|
@ -1516,7 +1538,8 @@ export class UpdatesManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// process pts-ordered updates
|
this.log.debug('processing pending pts-ordered updates')
|
||||||
|
|
||||||
while (pendingPtsUpdates.length) {
|
while (pendingPtsUpdates.length) {
|
||||||
const pending = pendingPtsUpdates.popFront()!
|
const pending = pendingPtsUpdates.popFront()!
|
||||||
const upd = pending.update
|
const upd = pending.update
|
||||||
|
@ -1528,7 +1551,7 @@ export class UpdatesManager {
|
||||||
if (!pending.channelId) localPts = this.pts!
|
if (!pending.channelId) localPts = this.pts!
|
||||||
else if (cpts.has(pending.channelId)) {
|
else if (cpts.has(pending.channelId)) {
|
||||||
localPts = cpts.get(pending.channelId)!
|
localPts = cpts.get(pending.channelId)!
|
||||||
} else if (this.catchUpChannels) {
|
} else if (this.catchingUp) {
|
||||||
// only load stored channel pts in case
|
// only load stored channel pts in case
|
||||||
// the user has enabled catching up.
|
// the user has enabled catching up.
|
||||||
// not loading stored pts effectively disables
|
// not loading stored pts effectively disables
|
||||||
|
@ -1617,7 +1640,8 @@ export class UpdatesManager {
|
||||||
await this._onUpdate(pending, requestedDiff)
|
await this._onUpdate(pending, requestedDiff)
|
||||||
}
|
}
|
||||||
|
|
||||||
// process postponed pts-ordered updates
|
this.log.debug('processing postponed pts-ordered updates')
|
||||||
|
|
||||||
for (let item = pendingPtsUpdatesPostponed._first; item; item = item.n) {
|
for (let item = pendingPtsUpdatesPostponed._first; item; item = item.n) {
|
||||||
// awesome fucking iteration because i'm so fucking tired and wanna kms
|
// awesome fucking iteration because i'm so fucking tired and wanna kms
|
||||||
const pending = item.v
|
const pending = item.v
|
||||||
|
@ -1692,7 +1716,8 @@ export class UpdatesManager {
|
||||||
pendingPtsUpdatesPostponed._remove(item)
|
pendingPtsUpdatesPostponed._remove(item)
|
||||||
}
|
}
|
||||||
|
|
||||||
// process qts-ordered updates
|
this.log.debug('processing pending qts-ordered updates')
|
||||||
|
|
||||||
while (pendingQtsUpdates.length) {
|
while (pendingQtsUpdates.length) {
|
||||||
const pending = pendingQtsUpdates.popFront()!
|
const pending = pendingQtsUpdates.popFront()!
|
||||||
const upd = pending.update
|
const upd = pending.update
|
||||||
|
@ -1746,7 +1771,8 @@ export class UpdatesManager {
|
||||||
await this._onUpdate(pending, requestedDiff)
|
await this._onUpdate(pending, requestedDiff)
|
||||||
}
|
}
|
||||||
|
|
||||||
// process postponed qts-ordered updates
|
this.log.debug('processing postponed qts-ordered updates')
|
||||||
|
|
||||||
for (let item = pendingQtsUpdatesPostponed._first; item; item = item.n) {
|
for (let item = pendingQtsUpdatesPostponed._first; item; item = item.n) {
|
||||||
// awesome fucking iteration because i'm so fucking tired and wanna kms
|
// awesome fucking iteration because i'm so fucking tired and wanna kms
|
||||||
const pending = item.v
|
const pending = item.v
|
||||||
|
@ -1797,7 +1823,6 @@ export class UpdatesManager {
|
||||||
|
|
||||||
this.hasTimedoutPostponed = false
|
this.hasTimedoutPostponed = false
|
||||||
|
|
||||||
// wait for all pending diffs to load
|
|
||||||
while (requestedDiff.size) {
|
while (requestedDiff.size) {
|
||||||
log.debug(
|
log.debug(
|
||||||
'waiting for %d pending diffs before processing unordered: %J',
|
'waiting for %d pending diffs before processing unordered: %J',
|
||||||
|
@ -1815,7 +1840,8 @@ export class UpdatesManager {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// process unordered updates (or updates received from diff)
|
this.log.debug('processing pending unordered updates')
|
||||||
|
|
||||||
while (pendingUnorderedUpdates.length) {
|
while (pendingUnorderedUpdates.length) {
|
||||||
const pending = pendingUnorderedUpdates.popFront()!
|
const pending = pendingUnorderedUpdates.popFront()!
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import { tl } from '@mtcute/tl'
|
import { tl } from '@mtcute/tl'
|
||||||
|
|
||||||
import { LogManager } from '../../utils/logger.js'
|
import { LogManager } from '../../utils/logger.js'
|
||||||
import { ITelegramClient } from '../client.types.js'
|
import { ConnectionState, ITelegramClient } from '../client.types.js'
|
||||||
import { PeersIndex } from '../types/peers/peers-index.js'
|
import { PeersIndex } from '../types/peers/peers-index.js'
|
||||||
import { RawUpdateHandler } from '../updates/types.js'
|
import { RawUpdateHandler } from '../updates/types.js'
|
||||||
import { AppConfigManagerProxy } from './app-config.js'
|
import { AppConfigManagerProxy } from './app-config.js'
|
||||||
|
@ -38,6 +38,11 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
|
||||||
this._updateHandler = handler
|
this._updateHandler = handler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private _connectionStateHandler: (state: ConnectionState) => void = () => {}
|
||||||
|
onConnectionState(handler: (state: ConnectionState) => void): void {
|
||||||
|
this._connectionStateHandler = handler
|
||||||
|
}
|
||||||
|
|
||||||
private _onMessage: ClientMessageHandler = (message) => {
|
private _onMessage: ClientMessageHandler = (message) => {
|
||||||
switch (message.type) {
|
switch (message.type) {
|
||||||
case 'log':
|
case 'log':
|
||||||
|
|
|
@ -301,6 +301,8 @@ export class MtClient extends EventEmitter {
|
||||||
useIpv6: Boolean(params.useIpv6),
|
useIpv6: Boolean(params.useIpv6),
|
||||||
enableErrorReporting: params.enableErrorReporting ?? false,
|
enableErrorReporting: params.enableErrorReporting ?? false,
|
||||||
onUsable: () => this.emit('usable'),
|
onUsable: () => this.emit('usable'),
|
||||||
|
onConnecting: () => this.emit('connecting'),
|
||||||
|
onNetworkChanged: (connected) => this.emit('networkChanged', connected),
|
||||||
onUpdate: (upd) => this.emit('update', upd),
|
onUpdate: (upd) => this.emit('update', upd),
|
||||||
...params.network,
|
...params.network,
|
||||||
},
|
},
|
||||||
|
|
|
@ -184,6 +184,7 @@ export class MultiSessionConnection extends EventEmitter {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
conn.on('usable', () => this.emit('usable', i))
|
conn.on('usable', () => this.emit('usable', i))
|
||||||
|
conn.on('wait', () => this.emit('wait', i))
|
||||||
conn.on('request-auth', () => this.emit('request-auth', i))
|
conn.on('request-auth', () => this.emit('request-auth', i))
|
||||||
conn.on('flood-done', () => {
|
conn.on('flood-done', () => {
|
||||||
this._log.debug('received flood-done from connection %d', i)
|
this._log.debug('received flood-done from connection %d', i)
|
||||||
|
@ -297,6 +298,12 @@ export class MultiSessionConnection extends EventEmitter {
|
||||||
// connection is idle, we don't need to notify it
|
// connection is idle, we don't need to notify it
|
||||||
}
|
}
|
||||||
|
|
||||||
|
notifyNetworkChanged(connected: boolean): void {
|
||||||
|
for (const conn of this._connections) {
|
||||||
|
conn.notifyNetworkChanged(connected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
requestAuth(): void {
|
requestAuth(): void {
|
||||||
this._connections[0]._authorize()
|
this._connections[0]._authorize()
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,8 @@ export interface NetworkManagerParams {
|
||||||
emitError: (err: Error, connection?: SessionConnection) => void
|
emitError: (err: Error, connection?: SessionConnection) => void
|
||||||
onUpdate: (upd: tl.TypeUpdates) => void
|
onUpdate: (upd: tl.TypeUpdates) => void
|
||||||
onUsable: () => void
|
onUsable: () => void
|
||||||
|
onConnecting: () => void
|
||||||
|
onNetworkChanged: (connected: boolean) => void
|
||||||
}
|
}
|
||||||
|
|
||||||
export type ConnectionCountDelegate = (kind: ConnectionKind, dcId: number, isPremium: boolean) => number
|
export type ConnectionCountDelegate = (kind: ConnectionKind, dcId: number, isPremium: boolean) => number
|
||||||
|
@ -490,6 +492,8 @@ export class NetworkManager {
|
||||||
return { main, media }
|
return { main, media }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private _resetOnNetworkChange?: () => void
|
||||||
|
|
||||||
private _switchPrimaryDc(dc: DcConnectionManager) {
|
private _switchPrimaryDc(dc: DcConnectionManager) {
|
||||||
if (this._primaryDc && this._primaryDc !== dc) {
|
if (this._primaryDc && this._primaryDc !== dc) {
|
||||||
this._primaryDc.setIsPrimary(false)
|
this._primaryDc.setIsPrimary(false)
|
||||||
|
@ -498,9 +502,16 @@ export class NetworkManager {
|
||||||
this._primaryDc = dc
|
this._primaryDc = dc
|
||||||
dc.setIsPrimary(true)
|
dc.setIsPrimary(true)
|
||||||
|
|
||||||
|
this.params.onConnecting()
|
||||||
|
|
||||||
dc.main.on('usable', () => {
|
dc.main.on('usable', () => {
|
||||||
|
if (dc !== this._primaryDc) return
|
||||||
this.params.onUsable()
|
this.params.onUsable()
|
||||||
})
|
})
|
||||||
|
dc.main.on('wait', () => {
|
||||||
|
if (dc !== this._primaryDc) return
|
||||||
|
this.params.onConnecting()
|
||||||
|
})
|
||||||
dc.main.on('update', (update: tl.TypeUpdates) => {
|
dc.main.on('update', (update: tl.TypeUpdates) => {
|
||||||
this._updateHandler(update, false)
|
this._updateHandler(update, false)
|
||||||
})
|
})
|
||||||
|
@ -557,6 +568,8 @@ export class NetworkManager {
|
||||||
throw new MtArgumentError('DC manager already exists')
|
throw new MtArgumentError('DC manager already exists')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this._resetOnNetworkChange = getPlatform().onNetworkChanged?.(this.notifyNetworkChanged.bind(this))
|
||||||
|
|
||||||
const dc = new DcConnectionManager(this, defaultDcs.main.id, defaultDcs, true)
|
const dc = new DcConnectionManager(this, defaultDcs.main.id, defaultDcs, true)
|
||||||
this._dcConnections.set(defaultDcs.main.id, dc)
|
this._dcConnections.set(defaultDcs.main.id, dc)
|
||||||
await this._switchPrimaryDc(dc)
|
await this._switchPrimaryDc(dc)
|
||||||
|
@ -648,6 +661,18 @@ export class NetworkManager {
|
||||||
this.resetSessions()
|
this.resetSessions()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
notifyNetworkChanged(connected: boolean): void {
|
||||||
|
this._log.debug('network changed: %s', connected ? 'connected' : 'disconnected')
|
||||||
|
this.params.onNetworkChanged(connected)
|
||||||
|
|
||||||
|
for (const dc of this._dcConnections.values()) {
|
||||||
|
dc.main.notifyNetworkChanged(connected)
|
||||||
|
dc.upload.notifyNetworkChanged(connected)
|
||||||
|
dc.download.notifyNetworkChanged(connected)
|
||||||
|
dc.downloadSmall.notifyNetworkChanged(connected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
resetSessions(): void {
|
resetSessions(): void {
|
||||||
const dc = this._primaryDc
|
const dc = this._primaryDc
|
||||||
if (!dc) return
|
if (!dc) return
|
||||||
|
@ -837,5 +862,6 @@ export class NetworkManager {
|
||||||
dc.destroy()
|
dc.destroy()
|
||||||
}
|
}
|
||||||
this.config.offReload(this._onConfigChanged)
|
this.config.offReload(this._onConfigChanged)
|
||||||
|
this._resetOnNetworkChange?.()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,8 @@ export abstract class PersistentConnection extends EventEmitter {
|
||||||
private _consequentFails = 0
|
private _consequentFails = 0
|
||||||
private _previousWait: number | null = null
|
private _previousWait: number | null = null
|
||||||
private _reconnectionTimeout: NodeJS.Timeout | null = null
|
private _reconnectionTimeout: NodeJS.Timeout | null = null
|
||||||
|
private _shouldReconnectImmediately = false
|
||||||
|
protected _disconnectedManually = false
|
||||||
|
|
||||||
// inactivity timeout
|
// inactivity timeout
|
||||||
private _inactivityTimeout: NodeJS.Timeout | null = null
|
private _inactivityTimeout: NodeJS.Timeout | null = null
|
||||||
|
@ -128,7 +130,14 @@ export abstract class PersistentConnection extends EventEmitter {
|
||||||
onTransportClose(): void {
|
onTransportClose(): void {
|
||||||
// transport closed because of inactivity
|
// transport closed because of inactivity
|
||||||
// obviously we dont want to reconnect then
|
// obviously we dont want to reconnect then
|
||||||
if (this._inactive) return
|
if (this._inactive || this._disconnectedManually) return
|
||||||
|
|
||||||
|
if (this._shouldReconnectImmediately) {
|
||||||
|
this._shouldReconnectImmediately = false
|
||||||
|
this.connect()
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
this._consequentFails += 1
|
this._consequentFails += 1
|
||||||
|
|
||||||
|
@ -169,13 +178,31 @@ export abstract class PersistentConnection extends EventEmitter {
|
||||||
|
|
||||||
if (this._reconnectionTimeout != null) {
|
if (this._reconnectionTimeout != null) {
|
||||||
clearTimeout(this._reconnectionTimeout)
|
clearTimeout(this._reconnectionTimeout)
|
||||||
|
this._reconnectionTimeout = null
|
||||||
}
|
}
|
||||||
|
|
||||||
this._inactive = false
|
this._inactive = false
|
||||||
|
this._disconnectedManually = false
|
||||||
this._transport.connect(this.params.dc, this.params.testMode)
|
this._transport.connect(this.params.dc, this.params.testMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
reconnect(): void {
|
reconnect(): void {
|
||||||
|
if (this._inactive) return
|
||||||
|
|
||||||
|
// if we are already connected
|
||||||
|
if (this.isConnected) {
|
||||||
|
this._shouldReconnectImmediately = true
|
||||||
|
this._transport.close()
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// if reconnection timeout is pending, it will be cancelled in connect()
|
||||||
|
this.connect()
|
||||||
|
}
|
||||||
|
|
||||||
|
disconnectManual(): void {
|
||||||
|
this._disconnectedManually = true
|
||||||
this._transport.close()
|
this._transport.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import Long from 'long'
|
||||||
import { mtp, tl } from '@mtcute/tl'
|
import { mtp, tl } from '@mtcute/tl'
|
||||||
import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlSerializationCounter, TlWriterMap } from '@mtcute/tl-runtime'
|
import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlSerializationCounter, TlWriterMap } from '@mtcute/tl-runtime'
|
||||||
|
|
||||||
|
import { getPlatform } from '../platform.js'
|
||||||
import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types/index.js'
|
import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types/index.js'
|
||||||
import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js'
|
import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js'
|
||||||
import { reportUnknownError } from '../utils/error-reporting.js'
|
import { reportUnknownError } from '../utils/error-reporting.js'
|
||||||
|
@ -103,6 +104,8 @@ export class SessionConnection extends PersistentConnection {
|
||||||
this._handleRawMessage = this._handleRawMessage.bind(this)
|
this._handleRawMessage = this._handleRawMessage.bind(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private _online = getPlatform().isOnline?.() ?? true
|
||||||
|
|
||||||
getAuthKey(temp = false): Uint8Array | null {
|
getAuthKey(temp = false): Uint8Array | null {
|
||||||
const key = temp ? this._session._authKeyTemp : this._session._authKey
|
const key = temp ? this._session._authKeyTemp : this._session._authKey
|
||||||
|
|
||||||
|
@ -754,7 +757,7 @@ export class SessionConnection extends PersistentConnection {
|
||||||
|
|
||||||
rpc.done = true
|
rpc.done = true
|
||||||
|
|
||||||
this.log.verbose('<<< (%s) %j', rpc.method, result)
|
this.log.verbose('<<< (%s:%l) %j', rpc.method, reqMsgId, result)
|
||||||
|
|
||||||
if (result._ === 'mt_rpc_error') {
|
if (result._ === 'mt_rpc_error') {
|
||||||
const res = result as mtp.RawMt_rpc_error
|
const res = result as mtp.RawMt_rpc_error
|
||||||
|
@ -1435,6 +1438,16 @@ export class SessionConnection extends PersistentConnection {
|
||||||
return pending.promise
|
return pending.promise
|
||||||
}
|
}
|
||||||
|
|
||||||
|
notifyNetworkChanged(online: boolean): void {
|
||||||
|
this._online = online
|
||||||
|
|
||||||
|
if (online) {
|
||||||
|
this.reconnect()
|
||||||
|
} else {
|
||||||
|
this.disconnectManual()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private _cancelRpc(rpc: PendingRpc, onTimeout = false, abortSignal?: AbortSignal): void {
|
private _cancelRpc(rpc: PendingRpc, onTimeout = false, abortSignal?: AbortSignal): void {
|
||||||
if (rpc.done) return
|
if (rpc.done) return
|
||||||
|
|
||||||
|
@ -1510,12 +1523,18 @@ export class SessionConnection extends PersistentConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
private _flush(): void {
|
private _flush(): void {
|
||||||
if (!this._session._authKey.ready || this._isPfsBindingPending || this._session.current429Timeout) {
|
if (
|
||||||
|
this._disconnectedManually ||
|
||||||
|
!this._session._authKey.ready ||
|
||||||
|
this._isPfsBindingPending ||
|
||||||
|
this._session.current429Timeout
|
||||||
|
) {
|
||||||
this.log.debug(
|
this.log.debug(
|
||||||
'skipping flush, connection is not usable (auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)',
|
'skipping flush, connection is not usable (offline = %b, auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)',
|
||||||
|
this._disconnectedManually,
|
||||||
this._session._authKey.ready,
|
this._session._authKey.ready,
|
||||||
this._isPfsBindingPending,
|
this._isPfsBindingPending,
|
||||||
Boolean(this._session.current429Timeout),
|
this._session.current429Timeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
// it will be flushed once connection is usable
|
// it will be flushed once connection is usable
|
||||||
|
|
|
@ -14,6 +14,8 @@ export interface ICorePlatform extends ITlPlatform {
|
||||||
fileSize?: number
|
fileSize?: number
|
||||||
fileName?: string
|
fileName?: string
|
||||||
} | null>
|
} | null>
|
||||||
|
onNetworkChanged?(fn: (connected: boolean) => void): () => void
|
||||||
|
isOnline?(): boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
// eslint-disable-next-line
|
// eslint-disable-next-line
|
||||||
|
|
|
@ -41,11 +41,19 @@ export interface BaseTelegramClientOptions
|
||||||
* @default `"client.session"`
|
* @default `"client.session"`
|
||||||
*/
|
*/
|
||||||
storage?: string | ITelegramStorageProvider
|
storage?: string | ITelegramStorageProvider
|
||||||
|
|
||||||
|
/**
|
||||||
|
* **ADVANCED USE ONLY**
|
||||||
|
*
|
||||||
|
* Whether to not set up the platform.
|
||||||
|
* This is useful if you call `setPlatform` yourself.
|
||||||
|
*/
|
||||||
|
platformless?: boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
export class BaseTelegramClient extends BaseTelegramClientBase {
|
export class BaseTelegramClient extends BaseTelegramClientBase {
|
||||||
constructor(opts: BaseTelegramClientOptions) {
|
constructor(opts: BaseTelegramClientOptions) {
|
||||||
setPlatform(new NodePlatform())
|
if (!opts.platformless) setPlatform(new NodePlatform())
|
||||||
|
|
||||||
super({
|
super({
|
||||||
// eslint-disable-next-line
|
// eslint-disable-next-line
|
||||||
|
|
|
@ -25,11 +25,19 @@ export interface BaseTelegramClientOptions
|
||||||
* @default `"client.session"`
|
* @default `"client.session"`
|
||||||
*/
|
*/
|
||||||
storage?: string | ITelegramStorageProvider
|
storage?: string | ITelegramStorageProvider
|
||||||
|
|
||||||
|
/**
|
||||||
|
* **ADVANCED USE ONLY**
|
||||||
|
*
|
||||||
|
* Whether to not set up the platform.
|
||||||
|
* This is useful if you call `setPlatform` yourself.
|
||||||
|
*/
|
||||||
|
platformless?: boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
export class BaseTelegramClient extends BaseTelegramClientBase {
|
export class BaseTelegramClient extends BaseTelegramClientBase {
|
||||||
constructor(opts: BaseTelegramClientOptions) {
|
constructor(opts: BaseTelegramClientOptions) {
|
||||||
setPlatform(new WebPlatform())
|
if (!opts.platformless) setPlatform(new WebPlatform())
|
||||||
|
|
||||||
super({
|
super({
|
||||||
crypto: new WebCryptoProvider(),
|
crypto: new WebCryptoProvider(),
|
||||||
|
|
|
@ -29,6 +29,23 @@ export class WebPlatform implements ICorePlatform {
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
onNetworkChanged(fn: (connected: boolean) => void) {
|
||||||
|
if (!('onLine' in navigator)) return () => {}
|
||||||
|
|
||||||
|
const onlineHandler = () => fn(navigator.onLine)
|
||||||
|
window.addEventListener('online', onlineHandler)
|
||||||
|
window.addEventListener('offline', onlineHandler)
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
window.removeEventListener('online', onlineHandler)
|
||||||
|
window.removeEventListener('offline', onlineHandler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
isOnline(): boolean {
|
||||||
|
return navigator.onLine
|
||||||
|
}
|
||||||
|
|
||||||
// ITlPlatform
|
// ITlPlatform
|
||||||
utf8ByteLength!: typeof utf8ByteLength
|
utf8ByteLength!: typeof utf8ByteLength
|
||||||
utf8Encode!: typeof utf8Encode
|
utf8Encode!: typeof utf8Encode
|
||||||
|
|
Loading…
Reference in a new issue