diff --git a/packages/core/src/highlevel/base.ts b/packages/core/src/highlevel/base.ts index aad8ac9f..d2e87812 100644 --- a/packages/core/src/highlevel/base.ts +++ b/packages/core/src/highlevel/base.ts @@ -321,4 +321,8 @@ export class BaseTelegramClient implements ITelegramClient { computeNewPasswordHash(algo: tl.TypePasswordKdfAlgo, password: string): Promise { return computeNewPasswordHash(this.crypto, algo, password) } + + get stopSignal(): AbortSignal { + return this.mt.stopSignal + } } diff --git a/packages/core/src/highlevel/client.ts b/packages/core/src/highlevel/client.ts index 0bfe3365..ec4d191f 100644 --- a/packages/core/src/highlevel/client.ts +++ b/packages/core/src/highlevel/client.ts @@ -5557,6 +5557,9 @@ export class TelegramClient extends EventEmitter implements ITelegramClient { this.log = this._client.log // @ts-expect-error codegen this.storage = this._client.storage + Object.defineProperty(this, 'stopSignal', { + get: () => this._client.stopSignal, + }) if (!opts.disableUpdates) { const skipConversationUpdates = opts.skipConversationUpdates ?? true diff --git a/packages/core/src/highlevel/client.types.ts b/packages/core/src/highlevel/client.types.ts index fd914152..970a0ec8 100644 --- a/packages/core/src/highlevel/client.types.ts +++ b/packages/core/src/highlevel/client.types.ts @@ -30,6 +30,7 @@ export interface ITelegramClient { readonly log: Logger readonly storage: PublicPart readonly appConfig: PublicPart + readonly stopSignal: AbortSignal prepare(): Promise connect(): Promise diff --git a/packages/core/src/highlevel/methods/_init.ts b/packages/core/src/highlevel/methods/_init.ts index 09d3c99b..fa334c73 100644 --- a/packages/core/src/highlevel/methods/_init.ts +++ b/packages/core/src/highlevel/methods/_init.ts @@ -64,6 +64,9 @@ function _initializeClient(this: TelegramClient, opts: TelegramClientOptions) { this.log = this._client.log // @ts-expect-error codegen this.storage = this._client.storage + Object.defineProperty(this, 'stopSignal', { + get: () => this._client.stopSignal, + }) if (!opts.disableUpdates) { const skipConversationUpdates = opts.skipConversationUpdates ?? true diff --git a/packages/core/src/highlevel/methods/chats/kick-chat-member.ts b/packages/core/src/highlevel/methods/chats/kick-chat-member.ts index 15a0f495..c858582a 100644 --- a/packages/core/src/highlevel/methods/chats/kick-chat-member.ts +++ b/packages/core/src/highlevel/methods/chats/kick-chat-member.ts @@ -1,4 +1,4 @@ -import { sleep } from '../../../utils/misc-utils.js' +import { sleepWithAbort } from '../../../utils/misc-utils.js' import { ITelegramClient } from '../../client.types.js' import { InputPeerLike, Message } from '../../types/index.js' import { isInputPeerChannel } from '../../utils/peer-utils.js' @@ -32,7 +32,7 @@ export async function kickChatMember( // not needed in case this is a legacy group if (isInputPeerChannel(chat)) { // i fucking love telegram serverside race conditions - await sleep(1000) + await sleepWithAbort(1000, client.stopSignal) await unbanChatMember(client, { chatId: chat, participantId: user }) } diff --git a/packages/core/src/highlevel/worker/port.ts b/packages/core/src/highlevel/worker/port.ts index 1a24cec6..4fecea7b 100644 --- a/packages/core/src/highlevel/worker/port.ts +++ b/packages/core/src/highlevel/worker/port.ts @@ -42,6 +42,9 @@ export abstract class TelegramWorkerPort imp readonly startUpdatesLoop readonly stopUpdatesLoop + private _abortController = new AbortController() + readonly stopSignal = this._abortController.signal + constructor(readonly options: TelegramWorkerPortOptions) { this.log = new LogManager('worker') @@ -122,6 +125,9 @@ export abstract class TelegramWorkerPort imp case 'error': this.emitError(message.error) break + case 'stop': + this._abortController.abort() + break } } diff --git a/packages/core/src/highlevel/worker/protocol.ts b/packages/core/src/highlevel/worker/protocol.ts index 4a603578..144c371e 100644 --- a/packages/core/src/highlevel/worker/protocol.ts +++ b/packages/core/src/highlevel/worker/protocol.ts @@ -24,6 +24,7 @@ export type WorkerOutboundMessage = hasMin: boolean } | { type: 'error'; error: unknown } + | { type: 'stop' } | { type: 'conn_state'; state: ConnectionState } | { type: 'log' diff --git a/packages/core/src/highlevel/worker/worker.ts b/packages/core/src/highlevel/worker/worker.ts index 4cefb616..b4bea663 100644 --- a/packages/core/src/highlevel/worker/worker.ts +++ b/packages/core/src/highlevel/worker/worker.ts @@ -46,6 +46,7 @@ export abstract class TelegramWorker { state, }), ) + client.stopSignal.addEventListener('abort', () => this.broadcast({ type: 'stop' })) if (client.updates) { client.onUpdate((update, peers) => diff --git a/packages/core/src/network/client.ts b/packages/core/src/network/client.ts index c5b2c485..42d42a36 100644 --- a/packages/core/src/network/client.ts +++ b/packages/core/src/network/client.ts @@ -238,6 +238,9 @@ export class MtClient extends EventEmitter { readonly log: Logger readonly network: NetworkManager + private _abortController: AbortController + readonly stopSignal: AbortSignal + constructor(readonly params: MtClientOptions) { super() @@ -267,6 +270,9 @@ export class MtClient extends EventEmitter { this._readerMap = params.readerMap ?? defaultReaderMap this._writerMap = params.writerMap ?? defaultWriterMap + this._abortController = new AbortController() + this.stopSignal = this._abortController.signal + this.storage = new StorageManager({ provider: params.storage, log: this.log, @@ -299,6 +305,7 @@ export class MtClient extends EventEmitter { onConnecting: () => this.emit('connecting'), onNetworkChanged: (connected) => this.emit('networkChanged', connected), onUpdate: (upd) => this.emit('update', upd), + stopSignal: this.stopSignal, ...params.network, }, this._config, @@ -361,6 +368,7 @@ export class MtClient extends EventEmitter { this._prepare.reset() this._connect.reset() + this._abortController.abort() } /** diff --git a/packages/core/src/network/network-manager.ts b/packages/core/src/network/network-manager.ts index d3b3a955..8b627065 100644 --- a/packages/core/src/network/network-manager.ts +++ b/packages/core/src/network/network-manager.ts @@ -10,7 +10,7 @@ import { DcOptions, ICryptoProvider, Logger, - sleep, + sleepWithAbort, } from '../utils/index.js' import { assertTypeIs } from '../utils/type-assertions.js' import { ConfigManager } from './config-manager.js' @@ -61,6 +61,7 @@ export interface NetworkManagerParams { onUsable: () => void onConnecting: () => void onNetworkChanged: (connected: boolean) => void + stopSignal: AbortSignal } export type ConnectionCountDelegate = (kind: ConnectionKind, dcId: number, isPremium: boolean) => number @@ -738,7 +739,7 @@ export class NetworkManager { // flood waits below 3 seconds are "ignored" this._floodWaitedRequests.delete(message._) } else if (delta <= this.params.floodSleepThreshold) { - await sleep(delta) + await sleepWithAbort(delta, this.params.stopSignal) this._floodWaitedRequests.delete(message._) } else { const err = tl.RpcError.create(tl.RpcError.FLOOD, 'FLOOD_WAIT_%d') @@ -790,7 +791,7 @@ export class NetworkManager { if (e.text === 'WORKER_BUSY_TOO_LONG_RETRY') { // according to tdlib, "it is dangerous to resend query without timeout, so use 1" - await sleep(1000) + await sleepWithAbort(1000, this.params.stopSignal) } continue } @@ -809,7 +810,7 @@ export class NetworkManager { if (e.seconds <= floodSleepThreshold) { this._log.warn('%s resulted in a flood wait, will retry in %d seconds', message._, e.seconds) - await sleep(e.seconds * 1000) + await sleepWithAbort(e.seconds * 1000, this.params.stopSignal) continue } } diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index b7bdd340..28afbba7 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -166,6 +166,7 @@ export class SessionConnection extends PersistentConnection { this._salts.isFetching = false if (forever) { + clearTimeout(this._pfsUpdateTimeout) this.removeAllListeners() this.on('error', (err) => { this.log.warn('caught error after destroying: %s', err) diff --git a/packages/core/src/utils/misc-utils.ts b/packages/core/src/utils/misc-utils.ts index de90a790..3467604b 100644 --- a/packages/core/src/utils/misc-utils.ts +++ b/packages/core/src/utils/misc-utils.ts @@ -5,6 +5,25 @@ */ export const sleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)) +export function sleepWithAbort(ms: number, signal: AbortSignal): Promise { + return new Promise((resolve, reject) => { + // eslint-disable-next-line prefer-const + let timeout: NodeJS.Timeout + + const onAbort = () => { + clearTimeout(timeout) + reject(signal.reason) + } + + signal.addEventListener('abort', onAbort) + + timeout = setTimeout(() => { + signal.removeEventListener('abort', onAbort) + resolve() + }, ms) + }) +} + export function getRandomInt(top: number): number { return Math.floor(Math.random() * top) }