diff --git a/packages/core/src/network/mtproto-session.ts b/packages/core/src/network/mtproto-session.ts index a97801f0..91fd45d4 100644 --- a/packages/core/src/network/mtproto-session.ts +++ b/packages/core/src/network/mtproto-session.ts @@ -41,6 +41,7 @@ export interface PendingRpc { getState?: number cancelled?: boolean timeout?: timers.Timer + resetAbortSignal?: () => void } export type PendingMessage = diff --git a/packages/core/src/network/persistent-connection.ts b/packages/core/src/network/persistent-connection.ts index 922325af..9ef89309 100644 --- a/packages/core/src/network/persistent-connection.ts +++ b/packages/core/src/network/persistent-connection.ts @@ -173,10 +173,16 @@ export abstract class PersistentConnection { } async disconnectManual(): Promise { + if (this._inactivityTimeout) { + timers.clearTimeout(this._inactivityTimeout) + } await this._fuman.close() } async destroy(): Promise { + if (this._inactivityTimeout) { + timers.clearTimeout(this._inactivityTimeout) + } this._destroyed = true await this._fuman.close() } diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index d52d1b11..572c93d4 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -768,6 +768,8 @@ export class SessionConnection extends PersistentConnection { const rpc = msg.rpc + rpc.resetAbortSignal?.() + const resultConstructorId = message.peekUint() let result: any @@ -1451,6 +1453,7 @@ export class SessionConnection extends PersistentConnection { acked: undefined, cancelled: undefined, timeout: undefined, + resetAbortSignal: undefined, } if (abortSignal?.aborted) { @@ -1464,7 +1467,9 @@ export class SessionConnection extends PersistentConnection { } if (abortSignal) { - abortSignal.addEventListener('abort', () => this._cancelRpc(pending, false, abortSignal)) + const cancel = () => this._cancelRpc(pending, false, abortSignal) + abortSignal.addEventListener('abort', cancel) + pending.resetAbortSignal = () => abortSignal.removeEventListener('abort', cancel) } this._enqueueRpc(pending, true) @@ -1495,6 +1500,8 @@ export class SessionConnection extends PersistentConnection { timers.clearTimeout(rpc.timeout) } + rpc.resetAbortSignal?.() + if (onTimeout) { rpc.promise.resolve({ _: 'mt_rpc_error', diff --git a/packages/core/src/utils/abort-signal.ts b/packages/core/src/utils/abort-signal.ts index 876d4c0c..aeda77f8 100644 --- a/packages/core/src/utils/abort-signal.ts +++ b/packages/core/src/utils/abort-signal.ts @@ -1,10 +1,20 @@ export function combineAbortSignals(signal1: AbortSignal, signal2?: AbortSignal): AbortSignal { if (!signal2) return signal1 + if (typeof AbortSignal.any === 'function') { + return AbortSignal.any([signal1, signal2]) + } + const controller = new AbortController() - signal1.addEventListener('abort', () => controller.abort()) - signal2.addEventListener('abort', () => controller.abort()) + function abort(this: AbortSignal) { + controller.abort(this.reason) + signal1.removeEventListener('abort', abort) + signal2!.removeEventListener('abort', abort) + } + + signal1.addEventListener('abort', abort) + signal2.addEventListener('abort', abort) return controller.signal } diff --git a/packages/core/src/utils/composer.test.ts b/packages/core/src/utils/composer.test.ts deleted file mode 100644 index 8729d8d7..00000000 --- a/packages/core/src/utils/composer.test.ts +++ /dev/null @@ -1,180 +0,0 @@ -import type { Middleware } from './composer.js' - -import { describe, expect, it } from 'vitest' -import { composeMiddlewares } from './composer.js' - -describe('composeMiddlewares', () => { - it('should compose middlewares', async () => { - const trace: unknown[] = [] - const middlewares: Middleware[] = [ - async (ctx, next) => { - trace.push(ctx) - trace.push(1) - await next([...ctx, 1]) - trace.push(6) - }, - async (ctx, next) => { - trace.push(ctx) - trace.push(2) - await next([...ctx, 2]) - trace.push(5) - }, - async (ctx, next) => { - trace.push(ctx) - trace.push(3) - await next([...ctx, 3]) - trace.push(4) - }, - ] - - const composed = composeMiddlewares(middlewares, async (res) => { - // eslint-disable-next-line ts/no-use-before-define - result = res - }) - - let result: readonly number[] = [] - await composed([]) - - expect(trace).toEqual([[], 1, [1], 2, [1, 2], 3, 4, 5, 6]) - expect(result).toEqual([1, 2, 3]) - }) - - it('should handle multiple calls to final', async () => { - const trace: unknown[] = [] - - const middlewares: Middleware[] = [ - async (ctx, next) => { - trace.push(1) - await next([2]) - trace.push(3) - await next([4]) - trace.push(5) - }, - ] - - const composed = composeMiddlewares(middlewares, async (res) => { - trace.push(res) - }) - - await composed([]) - - expect(trace).toEqual([1, [2], 3, [4], 5]) - }) - - it('should handle multiple calls to next midway', async () => { - const trace: unknown[] = [] - - const middlewares: Middleware[] = [ - async (ctx, next) => { - trace.push(1) - await next([2]) - trace.push(3) - await next([4]) - trace.push(5) - }, - (ctx, next) => next([6, ...ctx]), - ] - - const composed = composeMiddlewares(middlewares, async (res) => { - trace.push(res) - }) - - await composed([]) - - expect(trace).toEqual([1, [6, 2], 3, [6, 4], 5]) - }) - - it('should handle leaf middleware', async () => { - const trace: unknown[] = [] - - const middlewares: Middleware[] = [ - async (ctx, next) => { - trace.push(1) - - return next(ctx) - }, - async () => { - /* do nothing */ - }, - ] - - const composed = composeMiddlewares(middlewares, async (res) => { - trace.push(res) // should not be called - }) - - await composed([]) - - expect(trace).toEqual([1]) - }) - - it('should propagate return value', async () => { - const trace: unknown[] = [] - - const middlewares: Middleware[] = [ - async (ctx, next) => { - trace.push(1) - const res = await next([2]) - trace.push(3) - const res2 = await next([3, 4, 5]) - trace.push(6) - - return res + res2 - }, - async (ctx, next) => { - trace.push(-1) - - return (await next(ctx)) + 1 - }, - ] - - const composed = composeMiddlewares(middlewares, async (res) => { - trace.push(res) - - return res.length - }) - - const result = await composed([]) - - expect(trace).toEqual([1, -1, [2], 3, -1, [3, 4, 5], 6]) - expect(result).toBe(6) - }) - - it('should propagate errors', async () => { - const trace: unknown[] = [] - - const middlewares: Middleware[] = [ - async (ctx, next) => { - trace.push(1) - - try { - await next(2) - } catch { - trace.push('caught error') - } - - trace.push(3) - await next(4) - trace.push(5) - }, - (ctx, next) => next(ctx), // pass-thru - async (ctx, next) => { - if (ctx === 2) { - trace.push('error') - throw new Error('error') - } else { - trace.push('ok') - - return next(ctx) - } - }, - ] - - const composed = composeMiddlewares(middlewares, async (res) => { - trace.push(`final ${res}`) - }) - - await composed(0) - - expect(trace).toEqual([1, 'error', 'caught error', 3, 'ok', 'final 4', 5]) - }) -}) diff --git a/packages/core/src/utils/composer.ts b/packages/core/src/utils/composer.ts deleted file mode 100644 index 8e233620..00000000 --- a/packages/core/src/utils/composer.ts +++ /dev/null @@ -1,31 +0,0 @@ -export type Middleware = ( - ctx: Context, - next: (ctx: Context) => Promise, -) => Promise -export type ComposedMiddleware = (ctx: Context) => Promise - -export function composeMiddlewares( - middlewares: Middleware[], - final: ComposedMiddleware, -): ComposedMiddleware { - middlewares = middlewares.slice() - middlewares.push(final) - - function dispatch(i: number, ctx: Context): Promise { - const fn = middlewares[i] - if (!fn) return final(ctx) - - // eslint-disable-next-line ts/no-use-before-define - return fn(ctx, boundDispatches[i + 1]) - } - - const boundDispatches: Array<(ctx: Context) => Promise> = [] - - for (let i = 0; i < middlewares.length; i++) { - boundDispatches.push(dispatch.bind(null, i)) - } - - return function (context: Context): Promise { - return boundDispatches[0](context) - } -}