fix(core): fixed some dangling timers

This commit is contained in:
alina 🌸 2024-06-25 00:01:49 +03:00
parent 93c44412e6
commit 85f6610e09
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
12 changed files with 54 additions and 6 deletions

View file

@ -321,4 +321,8 @@ export class BaseTelegramClient implements ITelegramClient {
computeNewPasswordHash(algo: tl.TypePasswordKdfAlgo, password: string): Promise<Uint8Array> { computeNewPasswordHash(algo: tl.TypePasswordKdfAlgo, password: string): Promise<Uint8Array> {
return computeNewPasswordHash(this.crypto, algo, password) return computeNewPasswordHash(this.crypto, algo, password)
} }
get stopSignal(): AbortSignal {
return this.mt.stopSignal
}
} }

View file

@ -5557,6 +5557,9 @@ export class TelegramClient extends EventEmitter implements ITelegramClient {
this.log = this._client.log this.log = this._client.log
// @ts-expect-error codegen // @ts-expect-error codegen
this.storage = this._client.storage this.storage = this._client.storage
Object.defineProperty(this, 'stopSignal', {
get: () => this._client.stopSignal,
})
if (!opts.disableUpdates) { if (!opts.disableUpdates) {
const skipConversationUpdates = opts.skipConversationUpdates ?? true const skipConversationUpdates = opts.skipConversationUpdates ?? true

View file

@ -30,6 +30,7 @@ export interface ITelegramClient {
readonly log: Logger readonly log: Logger
readonly storage: PublicPart<TelegramStorageManager> readonly storage: PublicPart<TelegramStorageManager>
readonly appConfig: PublicPart<AppConfigManager> readonly appConfig: PublicPart<AppConfigManager>
readonly stopSignal: AbortSignal
prepare(): Promise<void> prepare(): Promise<void>
connect(): Promise<void> connect(): Promise<void>

View file

@ -64,6 +64,9 @@ function _initializeClient(this: TelegramClient, opts: TelegramClientOptions) {
this.log = this._client.log this.log = this._client.log
// @ts-expect-error codegen // @ts-expect-error codegen
this.storage = this._client.storage this.storage = this._client.storage
Object.defineProperty(this, 'stopSignal', {
get: () => this._client.stopSignal,
})
if (!opts.disableUpdates) { if (!opts.disableUpdates) {
const skipConversationUpdates = opts.skipConversationUpdates ?? true const skipConversationUpdates = opts.skipConversationUpdates ?? true

View file

@ -1,4 +1,4 @@
import { sleep } from '../../../utils/misc-utils.js' import { sleepWithAbort } from '../../../utils/misc-utils.js'
import { ITelegramClient } from '../../client.types.js' import { ITelegramClient } from '../../client.types.js'
import { InputPeerLike, Message } from '../../types/index.js' import { InputPeerLike, Message } from '../../types/index.js'
import { isInputPeerChannel } from '../../utils/peer-utils.js' import { isInputPeerChannel } from '../../utils/peer-utils.js'
@ -32,7 +32,7 @@ export async function kickChatMember(
// not needed in case this is a legacy group // not needed in case this is a legacy group
if (isInputPeerChannel(chat)) { if (isInputPeerChannel(chat)) {
// i fucking love telegram serverside race conditions // i fucking love telegram serverside race conditions
await sleep(1000) await sleepWithAbort(1000, client.stopSignal)
await unbanChatMember(client, { chatId: chat, participantId: user }) await unbanChatMember(client, { chatId: chat, participantId: user })
} }

View file

@ -42,6 +42,9 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
readonly startUpdatesLoop readonly startUpdatesLoop
readonly stopUpdatesLoop readonly stopUpdatesLoop
private _abortController = new AbortController()
readonly stopSignal = this._abortController.signal
constructor(readonly options: TelegramWorkerPortOptions) { constructor(readonly options: TelegramWorkerPortOptions) {
this.log = new LogManager('worker') this.log = new LogManager('worker')
@ -122,6 +125,9 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
case 'error': case 'error':
this.emitError(message.error) this.emitError(message.error)
break break
case 'stop':
this._abortController.abort()
break
} }
} }

View file

@ -24,6 +24,7 @@ export type WorkerOutboundMessage =
hasMin: boolean hasMin: boolean
} }
| { type: 'error'; error: unknown } | { type: 'error'; error: unknown }
| { type: 'stop' }
| { type: 'conn_state'; state: ConnectionState } | { type: 'conn_state'; state: ConnectionState }
| { | {
type: 'log' type: 'log'

View file

@ -46,6 +46,7 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
state, state,
}), }),
) )
client.stopSignal.addEventListener('abort', () => this.broadcast({ type: 'stop' }))
if (client.updates) { if (client.updates) {
client.onUpdate((update, peers) => client.onUpdate((update, peers) =>

View file

@ -238,6 +238,9 @@ export class MtClient extends EventEmitter {
readonly log: Logger readonly log: Logger
readonly network: NetworkManager readonly network: NetworkManager
private _abortController: AbortController
readonly stopSignal: AbortSignal
constructor(readonly params: MtClientOptions) { constructor(readonly params: MtClientOptions) {
super() super()
@ -267,6 +270,9 @@ export class MtClient extends EventEmitter {
this._readerMap = params.readerMap ?? defaultReaderMap this._readerMap = params.readerMap ?? defaultReaderMap
this._writerMap = params.writerMap ?? defaultWriterMap this._writerMap = params.writerMap ?? defaultWriterMap
this._abortController = new AbortController()
this.stopSignal = this._abortController.signal
this.storage = new StorageManager({ this.storage = new StorageManager({
provider: params.storage, provider: params.storage,
log: this.log, log: this.log,
@ -299,6 +305,7 @@ export class MtClient extends EventEmitter {
onConnecting: () => this.emit('connecting'), onConnecting: () => this.emit('connecting'),
onNetworkChanged: (connected) => this.emit('networkChanged', connected), onNetworkChanged: (connected) => this.emit('networkChanged', connected),
onUpdate: (upd) => this.emit('update', upd), onUpdate: (upd) => this.emit('update', upd),
stopSignal: this.stopSignal,
...params.network, ...params.network,
}, },
this._config, this._config,
@ -361,6 +368,7 @@ export class MtClient extends EventEmitter {
this._prepare.reset() this._prepare.reset()
this._connect.reset() this._connect.reset()
this._abortController.abort()
} }
/** /**

View file

@ -10,7 +10,7 @@ import {
DcOptions, DcOptions,
ICryptoProvider, ICryptoProvider,
Logger, Logger,
sleep, sleepWithAbort,
} from '../utils/index.js' } from '../utils/index.js'
import { assertTypeIs } from '../utils/type-assertions.js' import { assertTypeIs } from '../utils/type-assertions.js'
import { ConfigManager } from './config-manager.js' import { ConfigManager } from './config-manager.js'
@ -61,6 +61,7 @@ export interface NetworkManagerParams {
onUsable: () => void onUsable: () => void
onConnecting: () => void onConnecting: () => void
onNetworkChanged: (connected: boolean) => void onNetworkChanged: (connected: boolean) => void
stopSignal: AbortSignal
} }
export type ConnectionCountDelegate = (kind: ConnectionKind, dcId: number, isPremium: boolean) => number export type ConnectionCountDelegate = (kind: ConnectionKind, dcId: number, isPremium: boolean) => number
@ -738,7 +739,7 @@ export class NetworkManager {
// flood waits below 3 seconds are "ignored" // flood waits below 3 seconds are "ignored"
this._floodWaitedRequests.delete(message._) this._floodWaitedRequests.delete(message._)
} else if (delta <= this.params.floodSleepThreshold) { } else if (delta <= this.params.floodSleepThreshold) {
await sleep(delta) await sleepWithAbort(delta, this.params.stopSignal)
this._floodWaitedRequests.delete(message._) this._floodWaitedRequests.delete(message._)
} else { } else {
const err = tl.RpcError.create(tl.RpcError.FLOOD, 'FLOOD_WAIT_%d') const err = tl.RpcError.create(tl.RpcError.FLOOD, 'FLOOD_WAIT_%d')
@ -790,7 +791,7 @@ export class NetworkManager {
if (e.text === 'WORKER_BUSY_TOO_LONG_RETRY') { if (e.text === 'WORKER_BUSY_TOO_LONG_RETRY') {
// according to tdlib, "it is dangerous to resend query without timeout, so use 1" // according to tdlib, "it is dangerous to resend query without timeout, so use 1"
await sleep(1000) await sleepWithAbort(1000, this.params.stopSignal)
} }
continue continue
} }
@ -809,7 +810,7 @@ export class NetworkManager {
if (e.seconds <= floodSleepThreshold) { if (e.seconds <= floodSleepThreshold) {
this._log.warn('%s resulted in a flood wait, will retry in %d seconds', message._, e.seconds) this._log.warn('%s resulted in a flood wait, will retry in %d seconds', message._, e.seconds)
await sleep(e.seconds * 1000) await sleepWithAbort(e.seconds * 1000, this.params.stopSignal)
continue continue
} }
} }

View file

@ -166,6 +166,7 @@ export class SessionConnection extends PersistentConnection {
this._salts.isFetching = false this._salts.isFetching = false
if (forever) { if (forever) {
clearTimeout(this._pfsUpdateTimeout)
this.removeAllListeners() this.removeAllListeners()
this.on('error', (err) => { this.on('error', (err) => {
this.log.warn('caught error after destroying: %s', err) this.log.warn('caught error after destroying: %s', err)

View file

@ -5,6 +5,25 @@
*/ */
export const sleep = (ms: number): Promise<void> => new Promise((resolve) => setTimeout(resolve, ms)) export const sleep = (ms: number): Promise<void> => new Promise((resolve) => setTimeout(resolve, ms))
export function sleepWithAbort(ms: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
// eslint-disable-next-line prefer-const
let timeout: NodeJS.Timeout
const onAbort = () => {
clearTimeout(timeout)
reject(signal.reason)
}
signal.addEventListener('abort', onAbort)
timeout = setTimeout(() => {
signal.removeEventListener('abort', onAbort)
resolve()
}, ms)
})
}
export function getRandomInt(top: number): number { export function getRandomInt(top: number): number {
return Math.floor(Math.random() * top) return Math.floor(Math.random() * top)
} }