fix(core): stability improvements and leaks fixes
This commit is contained in:
parent
0c615869d2
commit
f0451d56e3
6 changed files with 27 additions and 214 deletions
|
@ -41,6 +41,7 @@ export interface PendingRpc {
|
||||||
getState?: number
|
getState?: number
|
||||||
cancelled?: boolean
|
cancelled?: boolean
|
||||||
timeout?: timers.Timer
|
timeout?: timers.Timer
|
||||||
|
resetAbortSignal?: () => void
|
||||||
}
|
}
|
||||||
|
|
||||||
export type PendingMessage =
|
export type PendingMessage =
|
||||||
|
|
|
@ -173,10 +173,16 @@ export abstract class PersistentConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
async disconnectManual(): Promise<void> {
|
async disconnectManual(): Promise<void> {
|
||||||
|
if (this._inactivityTimeout) {
|
||||||
|
timers.clearTimeout(this._inactivityTimeout)
|
||||||
|
}
|
||||||
await this._fuman.close()
|
await this._fuman.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
async destroy(): Promise<void> {
|
async destroy(): Promise<void> {
|
||||||
|
if (this._inactivityTimeout) {
|
||||||
|
timers.clearTimeout(this._inactivityTimeout)
|
||||||
|
}
|
||||||
this._destroyed = true
|
this._destroyed = true
|
||||||
await this._fuman.close()
|
await this._fuman.close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -768,6 +768,8 @@ export class SessionConnection extends PersistentConnection {
|
||||||
|
|
||||||
const rpc = msg.rpc
|
const rpc = msg.rpc
|
||||||
|
|
||||||
|
rpc.resetAbortSignal?.()
|
||||||
|
|
||||||
const resultConstructorId = message.peekUint()
|
const resultConstructorId = message.peekUint()
|
||||||
|
|
||||||
let result: any
|
let result: any
|
||||||
|
@ -1451,6 +1453,7 @@ export class SessionConnection extends PersistentConnection {
|
||||||
acked: undefined,
|
acked: undefined,
|
||||||
cancelled: undefined,
|
cancelled: undefined,
|
||||||
timeout: undefined,
|
timeout: undefined,
|
||||||
|
resetAbortSignal: undefined,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (abortSignal?.aborted) {
|
if (abortSignal?.aborted) {
|
||||||
|
@ -1464,7 +1467,9 @@ export class SessionConnection extends PersistentConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (abortSignal) {
|
if (abortSignal) {
|
||||||
abortSignal.addEventListener('abort', () => this._cancelRpc(pending, false, abortSignal))
|
const cancel = () => this._cancelRpc(pending, false, abortSignal)
|
||||||
|
abortSignal.addEventListener('abort', cancel)
|
||||||
|
pending.resetAbortSignal = () => abortSignal.removeEventListener('abort', cancel)
|
||||||
}
|
}
|
||||||
|
|
||||||
this._enqueueRpc(pending, true)
|
this._enqueueRpc(pending, true)
|
||||||
|
@ -1495,6 +1500,8 @@ export class SessionConnection extends PersistentConnection {
|
||||||
timers.clearTimeout(rpc.timeout)
|
timers.clearTimeout(rpc.timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rpc.resetAbortSignal?.()
|
||||||
|
|
||||||
if (onTimeout) {
|
if (onTimeout) {
|
||||||
rpc.promise.resolve({
|
rpc.promise.resolve({
|
||||||
_: 'mt_rpc_error',
|
_: 'mt_rpc_error',
|
||||||
|
|
|
@ -1,10 +1,20 @@
|
||||||
export function combineAbortSignals(signal1: AbortSignal, signal2?: AbortSignal): AbortSignal {
|
export function combineAbortSignals(signal1: AbortSignal, signal2?: AbortSignal): AbortSignal {
|
||||||
if (!signal2) return signal1
|
if (!signal2) return signal1
|
||||||
|
|
||||||
|
if (typeof AbortSignal.any === 'function') {
|
||||||
|
return AbortSignal.any([signal1, signal2])
|
||||||
|
}
|
||||||
|
|
||||||
const controller = new AbortController()
|
const controller = new AbortController()
|
||||||
|
|
||||||
signal1.addEventListener('abort', () => controller.abort())
|
function abort(this: AbortSignal) {
|
||||||
signal2.addEventListener('abort', () => controller.abort())
|
controller.abort(this.reason)
|
||||||
|
signal1.removeEventListener('abort', abort)
|
||||||
|
signal2!.removeEventListener('abort', abort)
|
||||||
|
}
|
||||||
|
|
||||||
|
signal1.addEventListener('abort', abort)
|
||||||
|
signal2.addEventListener('abort', abort)
|
||||||
|
|
||||||
return controller.signal
|
return controller.signal
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,180 +0,0 @@
|
||||||
import type { Middleware } from './composer.js'
|
|
||||||
|
|
||||||
import { describe, expect, it } from 'vitest'
|
|
||||||
import { composeMiddlewares } from './composer.js'
|
|
||||||
|
|
||||||
describe('composeMiddlewares', () => {
|
|
||||||
it('should compose middlewares', async () => {
|
|
||||||
const trace: unknown[] = []
|
|
||||||
const middlewares: Middleware<number[]>[] = [
|
|
||||||
async (ctx, next) => {
|
|
||||||
trace.push(ctx)
|
|
||||||
trace.push(1)
|
|
||||||
await next([...ctx, 1])
|
|
||||||
trace.push(6)
|
|
||||||
},
|
|
||||||
async (ctx, next) => {
|
|
||||||
trace.push(ctx)
|
|
||||||
trace.push(2)
|
|
||||||
await next([...ctx, 2])
|
|
||||||
trace.push(5)
|
|
||||||
},
|
|
||||||
async (ctx, next) => {
|
|
||||||
trace.push(ctx)
|
|
||||||
trace.push(3)
|
|
||||||
await next([...ctx, 3])
|
|
||||||
trace.push(4)
|
|
||||||
},
|
|
||||||
]
|
|
||||||
|
|
||||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
|
||||||
// eslint-disable-next-line ts/no-use-before-define
|
|
||||||
result = res
|
|
||||||
})
|
|
||||||
|
|
||||||
let result: readonly number[] = []
|
|
||||||
await composed([])
|
|
||||||
|
|
||||||
expect(trace).toEqual([[], 1, [1], 2, [1, 2], 3, 4, 5, 6])
|
|
||||||
expect(result).toEqual([1, 2, 3])
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should handle multiple calls to final', async () => {
|
|
||||||
const trace: unknown[] = []
|
|
||||||
|
|
||||||
const middlewares: Middleware<number[]>[] = [
|
|
||||||
async (ctx, next) => {
|
|
||||||
trace.push(1)
|
|
||||||
await next([2])
|
|
||||||
trace.push(3)
|
|
||||||
await next([4])
|
|
||||||
trace.push(5)
|
|
||||||
},
|
|
||||||
]
|
|
||||||
|
|
||||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
|
||||||
trace.push(res)
|
|
||||||
})
|
|
||||||
|
|
||||||
await composed([])
|
|
||||||
|
|
||||||
expect(trace).toEqual([1, [2], 3, [4], 5])
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should handle multiple calls to next midway', async () => {
|
|
||||||
const trace: unknown[] = []
|
|
||||||
|
|
||||||
const middlewares: Middleware<number[]>[] = [
|
|
||||||
async (ctx, next) => {
|
|
||||||
trace.push(1)
|
|
||||||
await next([2])
|
|
||||||
trace.push(3)
|
|
||||||
await next([4])
|
|
||||||
trace.push(5)
|
|
||||||
},
|
|
||||||
(ctx, next) => next([6, ...ctx]),
|
|
||||||
]
|
|
||||||
|
|
||||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
|
||||||
trace.push(res)
|
|
||||||
})
|
|
||||||
|
|
||||||
await composed([])
|
|
||||||
|
|
||||||
expect(trace).toEqual([1, [6, 2], 3, [6, 4], 5])
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should handle leaf middleware', async () => {
|
|
||||||
const trace: unknown[] = []
|
|
||||||
|
|
||||||
const middlewares: Middleware<number[]>[] = [
|
|
||||||
async (ctx, next) => {
|
|
||||||
trace.push(1)
|
|
||||||
|
|
||||||
return next(ctx)
|
|
||||||
},
|
|
||||||
async () => {
|
|
||||||
/* do nothing */
|
|
||||||
},
|
|
||||||
]
|
|
||||||
|
|
||||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
|
||||||
trace.push(res) // should not be called
|
|
||||||
})
|
|
||||||
|
|
||||||
await composed([])
|
|
||||||
|
|
||||||
expect(trace).toEqual([1])
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should propagate return value', async () => {
|
|
||||||
const trace: unknown[] = []
|
|
||||||
|
|
||||||
const middlewares: Middleware<number[], number>[] = [
|
|
||||||
async (ctx, next) => {
|
|
||||||
trace.push(1)
|
|
||||||
const res = await next([2])
|
|
||||||
trace.push(3)
|
|
||||||
const res2 = await next([3, 4, 5])
|
|
||||||
trace.push(6)
|
|
||||||
|
|
||||||
return res + res2
|
|
||||||
},
|
|
||||||
async (ctx, next) => {
|
|
||||||
trace.push(-1)
|
|
||||||
|
|
||||||
return (await next(ctx)) + 1
|
|
||||||
},
|
|
||||||
]
|
|
||||||
|
|
||||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
|
||||||
trace.push(res)
|
|
||||||
|
|
||||||
return res.length
|
|
||||||
})
|
|
||||||
|
|
||||||
const result = await composed([])
|
|
||||||
|
|
||||||
expect(trace).toEqual([1, -1, [2], 3, -1, [3, 4, 5], 6])
|
|
||||||
expect(result).toBe(6)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should propagate errors', async () => {
|
|
||||||
const trace: unknown[] = []
|
|
||||||
|
|
||||||
const middlewares: Middleware<number>[] = [
|
|
||||||
async (ctx, next) => {
|
|
||||||
trace.push(1)
|
|
||||||
|
|
||||||
try {
|
|
||||||
await next(2)
|
|
||||||
} catch {
|
|
||||||
trace.push('caught error')
|
|
||||||
}
|
|
||||||
|
|
||||||
trace.push(3)
|
|
||||||
await next(4)
|
|
||||||
trace.push(5)
|
|
||||||
},
|
|
||||||
(ctx, next) => next(ctx), // pass-thru
|
|
||||||
async (ctx, next) => {
|
|
||||||
if (ctx === 2) {
|
|
||||||
trace.push('error')
|
|
||||||
throw new Error('error')
|
|
||||||
} else {
|
|
||||||
trace.push('ok')
|
|
||||||
|
|
||||||
return next(ctx)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
]
|
|
||||||
|
|
||||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
|
||||||
trace.push(`final ${res}`)
|
|
||||||
})
|
|
||||||
|
|
||||||
await composed(0)
|
|
||||||
|
|
||||||
expect(trace).toEqual([1, 'error', 'caught error', 3, 'ok', 'final 4', 5])
|
|
||||||
})
|
|
||||||
})
|
|
|
@ -1,31 +0,0 @@
|
||||||
export type Middleware<Context, Result = void> = (
|
|
||||||
ctx: Context,
|
|
||||||
next: (ctx: Context) => Promise<Result>,
|
|
||||||
) => Promise<Result>
|
|
||||||
export type ComposedMiddleware<Context, Result = void> = (ctx: Context) => Promise<Result>
|
|
||||||
|
|
||||||
export function composeMiddlewares<Context, Result = void>(
|
|
||||||
middlewares: Middleware<Context, Result>[],
|
|
||||||
final: ComposedMiddleware<Context, Result>,
|
|
||||||
): ComposedMiddleware<Context, Result> {
|
|
||||||
middlewares = middlewares.slice()
|
|
||||||
middlewares.push(final)
|
|
||||||
|
|
||||||
function dispatch(i: number, ctx: Context): Promise<Result> {
|
|
||||||
const fn = middlewares[i]
|
|
||||||
if (!fn) return final(ctx)
|
|
||||||
|
|
||||||
// eslint-disable-next-line ts/no-use-before-define
|
|
||||||
return fn(ctx, boundDispatches[i + 1])
|
|
||||||
}
|
|
||||||
|
|
||||||
const boundDispatches: Array<(ctx: Context) => Promise<Result>> = []
|
|
||||||
|
|
||||||
for (let i = 0; i < middlewares.length; i++) {
|
|
||||||
boundDispatches.push(dispatch.bind(null, i))
|
|
||||||
}
|
|
||||||
|
|
||||||
return function (context: Context): Promise<Result> {
|
|
||||||
return boundDispatches[0](context)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue