fix(core): abort signal with workers
This commit is contained in:
parent
a78610af5f
commit
0290bb429a
4 changed files with 82 additions and 12 deletions
|
@ -10,7 +10,7 @@ export class WorkerInvoker {
|
||||||
private _nextId = 0
|
private _nextId = 0
|
||||||
private _pending = new Map<number, ControllablePromise>()
|
private _pending = new Map<number, ControllablePromise>()
|
||||||
|
|
||||||
private _invoke(target: InvokeTarget, method: string, args: unknown[], isVoid: boolean) {
|
private _invoke(target: InvokeTarget, method: string, args: unknown[], isVoid: boolean, abortSignal?: AbortSignal) {
|
||||||
const id = this._nextId++
|
const id = this._nextId++
|
||||||
|
|
||||||
this.send({
|
this.send({
|
||||||
|
@ -20,6 +20,14 @@ export class WorkerInvoker {
|
||||||
method,
|
method,
|
||||||
args,
|
args,
|
||||||
void: isVoid,
|
void: isVoid,
|
||||||
|
withAbort: Boolean(abortSignal),
|
||||||
|
})
|
||||||
|
|
||||||
|
abortSignal?.addEventListener('abort', () => {
|
||||||
|
this.send({
|
||||||
|
type: 'abort',
|
||||||
|
id,
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
if (!isVoid) {
|
if (!isVoid) {
|
||||||
|
@ -40,6 +48,12 @@ export class WorkerInvoker {
|
||||||
this._invoke(target, method, args, true)
|
this._invoke(target, method, args, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
invokeWithAbort(target: InvokeTarget, method: string, args: unknown[], abortSignal: AbortSignal): Promise<unknown> {
|
||||||
|
if (abortSignal.aborted) return Promise.reject(abortSignal.reason)
|
||||||
|
|
||||||
|
return this._invoke(target, method, args, false, abortSignal) as Promise<unknown>
|
||||||
|
}
|
||||||
|
|
||||||
handleResult(msg: Extract<WorkerOutboundMessage, { type: 'result' }>) {
|
handleResult(msg: Extract<WorkerOutboundMessage, { type: 'result' }>) {
|
||||||
const promise = this._pending.get(msg.id)
|
const promise = this._pending.get(msg.id)
|
||||||
if (!promise) return
|
if (!promise) return
|
||||||
|
|
|
@ -1,3 +1,7 @@
|
||||||
|
import { tl } from '@mtcute/tl'
|
||||||
|
|
||||||
|
import { RpcCallOptions } from '../../network/network-manager.js'
|
||||||
|
import { MustEqual } from '../../types/utils.js'
|
||||||
import { LogManager } from '../../utils/logger.js'
|
import { LogManager } from '../../utils/logger.js'
|
||||||
import { ConnectionState, ITelegramClient, ServerUpdateHandler } from '../client.types.js'
|
import { ConnectionState, ITelegramClient, ServerUpdateHandler } from '../client.types.js'
|
||||||
import { PeersIndex } from '../types/peers/peers-index.js'
|
import { PeersIndex } from '../types/peers/peers-index.js'
|
||||||
|
@ -28,7 +32,6 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
|
||||||
readonly notifyLoggedOut
|
readonly notifyLoggedOut
|
||||||
readonly notifyChannelOpened
|
readonly notifyChannelOpened
|
||||||
readonly notifyChannelClosed
|
readonly notifyChannelClosed
|
||||||
readonly call
|
|
||||||
readonly importSession
|
readonly importSession
|
||||||
readonly exportSession
|
readonly exportSession
|
||||||
readonly handleClientUpdate
|
readonly handleClientUpdate
|
||||||
|
@ -63,7 +66,6 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
|
||||||
this.notifyLoggedOut = bind('notifyLoggedOut')
|
this.notifyLoggedOut = bind('notifyLoggedOut')
|
||||||
this.notifyChannelOpened = bind('notifyChannelOpened')
|
this.notifyChannelOpened = bind('notifyChannelOpened')
|
||||||
this.notifyChannelClosed = bind('notifyChannelClosed')
|
this.notifyChannelClosed = bind('notifyChannelClosed')
|
||||||
this.call = bind('call')
|
|
||||||
this.importSession = bind('importSession')
|
this.importSession = bind('importSession')
|
||||||
this.exportSession = bind('exportSession')
|
this.exportSession = bind('exportSession')
|
||||||
this.handleClientUpdate = bind('handleClientUpdate', true)
|
this.handleClientUpdate = bind('handleClientUpdate', true)
|
||||||
|
@ -77,6 +79,21 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
|
||||||
this.stopUpdatesLoop = bind('stopUpdatesLoop')
|
this.stopUpdatesLoop = bind('stopUpdatesLoop')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
call<T extends tl.RpcMethod>(
|
||||||
|
message: MustEqual<T, tl.RpcMethod>,
|
||||||
|
params?: RpcCallOptions,
|
||||||
|
): Promise<tl.RpcCallReturn[T['_']]> {
|
||||||
|
if (params?.abortSignal) {
|
||||||
|
const { abortSignal, ...rest } = params
|
||||||
|
|
||||||
|
return this._invoker.invokeWithAbort('client', 'call', [message, rest], abortSignal) as Promise<
|
||||||
|
tl.RpcCallReturn[T['_']]
|
||||||
|
>
|
||||||
|
}
|
||||||
|
|
||||||
|
return this._invoker.invoke('client', 'call', [message, params]) as Promise<tl.RpcCallReturn[T['_']]>
|
||||||
|
}
|
||||||
|
|
||||||
abstract connectToWorker(worker: SomeWorker, handler: ClientMessageHandler): [SendFn, () => void]
|
abstract connectToWorker(worker: SomeWorker, handler: ClientMessageHandler): [SendFn, () => void]
|
||||||
|
|
||||||
private _serverUpdatesHandler: ServerUpdateHandler = () => {}
|
private _serverUpdatesHandler: ServerUpdateHandler = () => {}
|
||||||
|
|
|
@ -5,14 +5,20 @@ import { tl } from '@mtcute/tl'
|
||||||
import { ConnectionState } from '../client.types.js'
|
import { ConnectionState } from '../client.types.js'
|
||||||
import { SerializedError } from './errors.js'
|
import { SerializedError } from './errors.js'
|
||||||
|
|
||||||
export type WorkerInboundMessage = {
|
export type WorkerInboundMessage =
|
||||||
type: 'invoke'
|
| {
|
||||||
id: number
|
type: 'invoke'
|
||||||
target: 'custom' | 'client' | 'storage' | 'storage-self' | 'storage-peers' | 'app-config'
|
id: number
|
||||||
method: string
|
target: 'custom' | 'client' | 'storage' | 'storage-self' | 'storage-peers' | 'app-config'
|
||||||
args: unknown[]
|
method: string
|
||||||
void: boolean
|
args: unknown[]
|
||||||
}
|
void: boolean
|
||||||
|
withAbort: boolean
|
||||||
|
}
|
||||||
|
| {
|
||||||
|
type: 'abort'
|
||||||
|
id: number
|
||||||
|
}
|
||||||
|
|
||||||
export type WorkerOutboundMessage =
|
export type WorkerOutboundMessage =
|
||||||
| { type: 'server_update'; update: tl.TypeUpdates }
|
| { type: 'server_update'; update: tl.TypeUpdates }
|
||||||
|
|
|
@ -13,12 +13,22 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
|
||||||
|
|
||||||
abstract registerWorker(handler: WorkerMessageHandler): RespondFn
|
abstract registerWorker(handler: WorkerMessageHandler): RespondFn
|
||||||
|
|
||||||
|
readonly pendingAborts = new Map<number, AbortController>()
|
||||||
|
|
||||||
constructor(readonly params: TelegramWorkerOptions<T>) {
|
constructor(readonly params: TelegramWorkerOptions<T>) {
|
||||||
this.broadcast = this.registerWorker((message, respond) => {
|
this.broadcast = this.registerWorker((message, respond) => {
|
||||||
switch (message.type) {
|
switch (message.type) {
|
||||||
case 'invoke':
|
case 'invoke':
|
||||||
this.onInvoke(message, respond)
|
this.onInvoke(message, respond)
|
||||||
break
|
break
|
||||||
|
case 'abort': {
|
||||||
|
const abort = this.pendingAborts.get(message.id)
|
||||||
|
|
||||||
|
if (abort) {
|
||||||
|
abort.abort()
|
||||||
|
this.pendingAborts.delete(message.id)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -116,9 +126,28 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let args = msg.args
|
||||||
|
|
||||||
|
if (msg.target === 'client' && msg.method === 'call' && msg.withAbort) {
|
||||||
|
const abort = new AbortController()
|
||||||
|
this.pendingAborts.set(msg.id, abort)
|
||||||
|
|
||||||
|
args = [
|
||||||
|
args[0],
|
||||||
|
{
|
||||||
|
...(args[1] as object),
|
||||||
|
abortSignal: abort.signal,
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
|
||||||
Promise.resolve(method.apply(target, msg.args))
|
Promise.resolve(method.apply(target, args))
|
||||||
.then((res) => {
|
.then((res) => {
|
||||||
|
if (msg.withAbort) {
|
||||||
|
this.pendingAborts.delete(msg.id)
|
||||||
|
}
|
||||||
|
|
||||||
if (msg.void) return
|
if (msg.void) return
|
||||||
|
|
||||||
respond({
|
respond({
|
||||||
|
@ -128,6 +157,10 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
|
if (msg.withAbort) {
|
||||||
|
this.pendingAborts.delete(msg.id)
|
||||||
|
}
|
||||||
|
|
||||||
respond({
|
respond({
|
||||||
type: 'result',
|
type: 'result',
|
||||||
id: msg.id,
|
id: msg.id,
|
||||||
|
|
Loading…
Reference in a new issue