From b7751f0a574361cfc417b9894bddb5b071b7dd55 Mon Sep 17 00:00:00 2001 From: teidesu <86301490+teidesu@users.noreply.github.com> Date: Thu, 5 Aug 2021 20:14:19 +0300 Subject: [PATCH] fix: various fixes, improved updates handing --- packages/client/src/client.ts | 10 +++++ packages/client/src/methods/_imports.ts | 3 +- packages/client/src/methods/updates.ts | 8 +++- packages/core/src/base-client.ts | 38 +++++++++++-------- .../core/src/network/persistent-connection.ts | 4 +- .../core/src/network/telegram-connection.ts | 1 + 6 files changed, 45 insertions(+), 19 deletions(-) diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index 0f73159b..6f978547 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -149,6 +149,7 @@ import { _dispatchUpdate, _fetchUpdatesState, _handleUpdate, + _keepAliveAction, _loadStorage, _saveStorage, catchUp, @@ -172,6 +173,7 @@ import { Readable } from 'stream' import { ArrayWithTotal, BotCommands, + BotStoppedUpdate, CallbackQuery, Chat, ChatEvent, @@ -339,6 +341,13 @@ export interface TelegramClient extends BaseTelegramClient { * @param handler History read handler */ on(name: 'history_read', handler: (upd: HistoryReadUpdate) => void): this + /** + * Register a bot stopped handler + * + * @param name Event name + * @param handler Bot stopped handler + */ + on(name: 'bot_stopped', handler: (upd: BotStoppedUpdate) => void): this /** * Accept the given TOS * @@ -3660,6 +3669,7 @@ export class TelegramClient extends BaseTelegramClient { protected _dispatchUpdate = _dispatchUpdate _handleUpdate = _handleUpdate catchUp = catchUp + protected _keepAliveAction = _keepAliveAction blockUser = blockUser deleteProfilePhotos = deleteProfilePhotos getCommonChats = getCommonChats diff --git a/packages/client/src/methods/_imports.ts b/packages/client/src/methods/_imports.ts index 13a796fb..635818e9 100644 --- a/packages/client/src/methods/_imports.ts +++ b/packages/client/src/methods/_imports.ts @@ -52,7 +52,8 @@ import { PollVoteUpdate, UserStatusUpdate, UserTypingUpdate, - Conversation + Conversation, + BotStoppedUpdate } from '../types' // @copy diff --git a/packages/client/src/methods/updates.ts b/packages/client/src/methods/updates.ts index ca84d3d8..450d5563 100644 --- a/packages/client/src/methods/updates.ts +++ b/packages/client/src/methods/updates.ts @@ -10,7 +10,7 @@ import { getBarePeerId, getMarkedPeerId, markedPeerIdToBare, - MAX_CHANNEL_ID, + MAX_CHANNEL_ID, RpcError, } from '@mtqt/core' import { isDummyUpdate, isDummyUpdates } from '../utils/updates-utils' import { ChatsIndex, UsersIndex } from '../types' @@ -1002,3 +1002,9 @@ export function catchUp(this: TelegramClient): Promise { .then(() => this._updLock.release()) .then(() => this._saveStorage()) } + +/** @internal */ +export function _keepAliveAction(this: TelegramClient): void { + debug('no updates for >15 minutes, catching up') + this.catchUp().catch((err) => this._emitError(err)) +} diff --git a/packages/core/src/base-client.ts b/packages/core/src/base-client.ts index 64215ff2..4be75d32 100644 --- a/packages/core/src/base-client.ts +++ b/packages/core/src/base-client.ts @@ -236,7 +236,7 @@ export class BaseTelegramClient extends EventEmitter { readonly _layer: number private _keepAliveInterval?: NodeJS.Timeout - private _lastRequestTime = 0 + protected _lastUpdateTime = 0 private _floodWaitedRequests: Record = {} protected _config?: tl.RawConfig @@ -332,6 +332,19 @@ export class BaseTelegramClient extends EventEmitter { await this.storage.save?.() } + protected _keepAliveAction(): void { + // telegram asks to fetch pending updates + // if there are no updates for 15 minutes. + // core does not have update handling, + // so we just use getState so the server knows + // we still do need updates + this.call({ _: 'updates.getState' }).catch((e) => { + if (!(e instanceof RpcError)) { + this.primaryConnection.reconnect() + } + }) + } + private _cleanupPrimaryConnection(forever = false): void { if (forever && this.primaryConnection) this.primaryConnection.destroy() if (this._keepAliveInterval) clearInterval(this._keepAliveInterval) @@ -349,25 +362,19 @@ export class BaseTelegramClient extends EventEmitter { reconnectionStrategy: this._reconnectionStrategy, layer: this._layer, }) - this.primaryConnection.on('usable', async () => { + this.primaryConnection.on('usable', async (isReconnection: boolean) => { + this._lastUpdateTime = Date.now() + this._keepAliveInterval = setInterval(async () => { - // according to telethon, "We need to send some content-related request at least hourly - // for Telegram to keep delivering updates, otherwise they will just stop even if we're connected. - // Do so every 30 minutes" - if (Date.now() - this._lastRequestTime > 1800_000) { - try { - await this.call({ _: 'updates.getState' }) - } catch (e) { - if (!(e instanceof RpcError)) { - this.primaryConnection.reconnect() - } - } + if (Date.now() - this._lastUpdateTime > 900_000) { + this._keepAliveAction() + this._lastUpdateTime = Date.now() } }, 60_000) // on reconnection we need to call updates.getState so Telegram // knows we still want the updates - if (!this._disableUpdates) { + if (isReconnection && !this._disableUpdates) { setTimeout(async () => { try { await this.call({ _: 'updates.getState' }) @@ -380,6 +387,7 @@ export class BaseTelegramClient extends EventEmitter { } }) this.primaryConnection.on('update', (update) => { + this._lastUpdateTime = Date.now() this._handleUpdate(update) }) this.primaryConnection.on('wait', () => @@ -568,8 +576,6 @@ export class BaseTelegramClient extends EventEmitter { } as any // who cares } - this._lastRequestTime = Date.now() - const connection = params?.connection ?? this.primaryConnection let lastError: Error | null = null diff --git a/packages/core/src/network/persistent-connection.ts b/packages/core/src/network/persistent-connection.ts index ec6ade5b..75fbb30a 100644 --- a/packages/core/src/network/persistent-connection.ts +++ b/packages/core/src/network/persistent-connection.ts @@ -82,12 +82,14 @@ export abstract class PersistentConnection extends EventEmitter { } protected onConnectionUsable(): void { + const isReconnection = this._consequentFails > 0 + // reset reconnection related state this._lastError = null this._consequentFails = 0 this._previousWait = null this._usable = true - this.emit('usable') + this.emit('usable', isReconnection) this._rescheduleInactivity() } diff --git a/packages/core/src/network/telegram-connection.ts b/packages/core/src/network/telegram-connection.ts index b1786c1f..0c3da2a1 100644 --- a/packages/core/src/network/telegram-connection.ts +++ b/packages/core/src/network/telegram-connection.ts @@ -149,6 +149,7 @@ export class TelegramConnection extends PersistentConnection { it.promise.reject(new Error('Connection destroyed')) ) this._mtproto.reset() + this.removeAllListeners() } }