chore: worker

This commit is contained in:
alina 🌸 2024-03-03 09:45:54 +03:00
parent 791a3a58b5
commit cbc2002781
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
19 changed files with 320 additions and 273 deletions

View file

@ -16,6 +16,7 @@
".": "./src/index.ts",
"./utils.js": "./src/utils/index.ts",
"./client.js": "./src/highlevel/client.ts",
"./worker.js": "./src/highlevel/worker/index.ts",
"./methods.js": "./src/highlevel/methods.ts",
"./platform.js": "./src/platform.ts"
},
@ -40,6 +41,10 @@
"./client.js": {
"import": "./esm/highlevel/client.js",
"require": "./cjs/highlevel/client.js"
},
"./worker.js": {
"import": "./esm/highlevel/worker/index.js",
"require": "./cjs/highlevel/worker/index.js"
}
}
},

View file

@ -689,6 +689,7 @@ on(name: string, handler: (...args: any[]) => void): this\n`)
output.write('}\n')
output.write('\nexport type { TelegramClientOptions }\n')
output.write('\nexport * from "./base.js"\n')
output.write('\nexport class TelegramClient extends EventEmitter implements ITelegramClient {\n')
output.write(' _client: ITelegramClient\n')

View file

@ -5135,6 +5135,8 @@ export interface TelegramClient extends ITelegramClient {
export type { TelegramClientOptions }
export * from './base.js'
export class TelegramClient extends EventEmitter implements ITelegramClient {
_client: ITelegramClient
constructor(opts: TelegramClientOptions) {

View file

@ -1,4 +1,3 @@
export * from './base.js'
export * from './client.types.js'
export * from './storage/index.js'
export * from './types/index.js'

View file

@ -0,0 +1,3 @@
export * from './port.js'
export * from './protocol.js'
export * from './worker.js'

View file

@ -1,20 +0,0 @@
import { Worker } from 'worker_threads'
import { ClientMessageHandler, SendFn, SomeWorker } from '../protocol.js'
export function connectToWorker(worker: SomeWorker, handler: ClientMessageHandler): [SendFn, () => void] {
if (!(worker instanceof Worker)) {
throw new Error('Only worker_threads are supported')
}
const send: SendFn = worker.postMessage.bind(worker)
worker.on('message', handler)
return [
send,
() => {
worker.off('message', handler)
},
]
}

View file

@ -1,61 +0,0 @@
import { getPlatform } from '../../../platform.js'
import { ClientMessageHandler, SendFn, SomeWorker } from '../protocol.js'
export function connectToWorker(worker: SomeWorker, handler: ClientMessageHandler): [SendFn, () => void] {
if (worker instanceof Worker) {
const send: SendFn = worker.postMessage.bind(worker)
const messageHandler = (ev: MessageEvent) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
handler(ev.data)
}
worker.addEventListener('message', messageHandler)
return [
send,
() => {
worker.removeEventListener('message', messageHandler)
},
]
}
if (worker instanceof SharedWorker) {
const send: SendFn = worker.port.postMessage.bind(worker.port)
const pingInterval = setInterval(() => {
worker.port.postMessage({ __type__: 'ping' })
}, 10000)
const messageHandler = (ev: MessageEvent) => {
if (ev.data.__type__ === 'timeout') {
// we got disconnected from the worker due to timeout
// if the page is still alive (which is unlikely), we should reconnect
// however it's not really possible with SharedWorker API without re-creating the worker
// so we just reload the page for now
location.reload()
return
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
handler(ev.data)
}
worker.port.addEventListener('message', messageHandler)
worker.port.start()
const close = () => {
clearInterval(pingInterval)
worker.port.postMessage({ __type__: 'close' })
worker.port.removeEventListener('message', messageHandler)
worker.port.close()
}
getPlatform().beforeExit(close)
return [send, close]
}
throw new Error('Only workers and shared workers are supported')
}

View file

@ -1,23 +0,0 @@
import { parentPort } from 'worker_threads'
import { RespondFn, WorkerMessageHandler } from '../protocol.js'
const registered = false
export function registerWorker(handler: WorkerMessageHandler): RespondFn {
if (!parentPort) {
throw new Error('registerWorker() must be called from a worker thread')
}
if (registered) {
throw new Error('registerWorker() must be called only once')
}
const port = parentPort
const respond: RespondFn = port.postMessage.bind(port)
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
parentPort.on('message', (message) => handler(message, respond))
return respond
}

View file

@ -1,80 +0,0 @@
import { RespondFn, WorkerMessageHandler } from '../protocol.js'
const registered = false
export function registerWorker(handler: WorkerMessageHandler): RespondFn {
if (registered) {
throw new Error('registerWorker() must be called only once')
}
if (typeof WorkerGlobalScope !== 'undefined' && self instanceof WorkerGlobalScope) {
const respond: RespondFn = self.postMessage.bind(self)
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
self.addEventListener('message', (message) => handler(message.data, respond))
return respond
}
if (typeof SharedWorkerGlobalScope !== 'undefined' && self instanceof SharedWorkerGlobalScope) {
const connections: MessagePort[] = []
const broadcast = (message: unknown) => {
for (const port of connections) {
port.postMessage(message)
}
}
self.onconnect = (event) => {
const port = event.ports[0]
connections.push(port)
const respond = port.postMessage.bind(port)
// not very reliable, but better than nothing
// SharedWorker API doesn't provide a way to detect when the client closes the connection
// so we just assume that the client is done when it sends a 'close' message
// and keep a timeout for the case when the client closes without sending a 'close' message
const onClose = () => {
port.close()
const idx = connections.indexOf(port)
if (idx >= 0) {
connections.splice(connections.indexOf(port), 1)
}
}
const onTimeout = () => {
console.warn('some connection timed out!')
respond({ __type__: 'timeout' })
onClose()
}
// 60s should be a reasonable timeout considering that the client should send a ping every 10s
// so even if the browser has suspended the timers, we should still get a ping within a minute
let timeout = setTimeout(onTimeout, 60000)
port.addEventListener('message', (message) => {
if (message.data.__type__ === 'close') {
onClose()
return
}
if (message.data.__type__ === 'ping') {
clearTimeout(timeout)
timeout = setTimeout(onTimeout, 60000)
return
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
handler(message.data, respond)
})
}
return broadcast
}
throw new Error('registerWorker() must be called from a worker')
}

View file

@ -6,17 +6,18 @@ import { PeersIndex } from '../types/peers/peers-index.js'
import { RawUpdateHandler } from '../updates/types.js'
import { AppConfigManagerProxy } from './app-config.js'
import { WorkerInvoker } from './invoker.js'
import { connectToWorker } from './platform/connect.js'
import { ClientMessageHandler, SomeWorker, WorkerCustomMethods } from './protocol.js'
import { ClientMessageHandler, SendFn, SomeWorker, WorkerCustomMethods } from './protocol.js'
import { TelegramStorageProxy } from './storage.js'
export interface TelegramWorkerPortOptions {
worker: SomeWorker
}
export class TelegramWorkerPort<Custom extends WorkerCustomMethods> implements ITelegramClient {
export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> implements ITelegramClient {
constructor(readonly options: TelegramWorkerPortOptions) {}
abstract connectToWorker(worker: SomeWorker, handler: ClientMessageHandler): [SendFn, () => void]
readonly log = new LogManager('worker')
private _serverUpdatesHandler: (updates: tl.TypeUpdates) => void = () => {}
@ -60,7 +61,7 @@ export class TelegramWorkerPort<Custom extends WorkerCustomMethods> implements I
}
}
private _connection = connectToWorker(this.options.worker, this._onMessage)
private _connection = this.connectToWorker(this.options.worker, this._onMessage)
private _invoker = new WorkerInvoker(this._connection[0])
private _bind = this._invoker.makeBinder<ITelegramClient>('client')

View file

@ -1,40 +1,88 @@
import { BaseTelegramClient, BaseTelegramClientOptions } from '../base.js'
import { BaseTelegramClient } from '../base.js'
import { serializeError } from './errors.js'
import { registerWorker } from './platform/register.js'
import { RespondFn, WorkerCustomMethods, WorkerInboundMessage, WorkerMessageHandler } from './protocol.js'
export interface TelegramWorkerOptions<T extends WorkerCustomMethods> {
client: BaseTelegramClient | BaseTelegramClientOptions
client: BaseTelegramClient
customMethods?: T
}
export function makeTelegramWorker<T extends WorkerCustomMethods>(params: TelegramWorkerOptions<T>) {
const { client: client_, customMethods } = params
export abstract class TelegramWorker<T extends WorkerCustomMethods> {
readonly client: BaseTelegramClient
readonly broadcast: RespondFn
const client = client_ instanceof BaseTelegramClient ? client_ : new BaseTelegramClient(client_)
abstract registerWorker(handler: WorkerMessageHandler): RespondFn
const onInvoke = (msg: Extract<WorkerInboundMessage, { type: 'invoke' }>, respond: RespondFn) => {
constructor(readonly params: TelegramWorkerOptions<T>) {
this.broadcast = this.registerWorker((message, respond) => {
switch (message.type) {
case 'invoke':
this.onInvoke(message, respond)
break
}
})
const client = params.client
this.client = client
client.log.mgr.handler = (color, level, tag, fmt, args) =>
this.broadcast({
type: 'log',
color,
level,
tag,
fmt,
args,
})
client.onError((err) =>
this.broadcast({
type: 'error',
error: err,
}),
)
if (client.updates) {
client.onUpdate((update, peers) =>
this.broadcast({
type: 'update',
update,
users: peers.users,
chats: peers.chats,
hasMin: peers.hasMin,
}),
)
} else {
client.onServerUpdate((update) =>
this.broadcast({
type: 'server_update',
update,
}),
)
}
}
private onInvoke(msg: Extract<WorkerInboundMessage, { type: 'invoke' }>, respond: RespondFn) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let target: any
switch (msg.target) {
case 'custom':
target = customMethods
target = this.params.customMethods
break
case 'client':
target = client
target = this.client
break
case 'storage':
target = client.storage
target = this.client.storage
break
case 'storage-self':
target = client.storage.self
target = this.client.storage.self
break
case 'storage-peers':
target = client.storage.peers
target = this.client.storage.peers
break
case 'app-config':
target = client.appConfig
target = this.client.appConfig
break
default: {
@ -80,49 +128,4 @@ export function makeTelegramWorker<T extends WorkerCustomMethods>(params: Telegr
})
})
}
const onMessage: WorkerMessageHandler = (message, respond) => {
switch (message.type) {
case 'invoke':
onInvoke(message, respond)
break
}
}
const broadcast = registerWorker(onMessage)
client.log.mgr.handler = (color, level, tag, fmt, args) =>
broadcast({
type: 'log',
color,
level,
tag,
fmt,
args,
})
client.onError((err) =>
broadcast({
type: 'error',
error: err,
}),
)
if (client.updates) {
client.onUpdate((update, peers) =>
broadcast({
type: 'update',
update,
users: peers.users,
chats: peers.chats,
hasMin: peers.hasMin,
}),
)
} else {
client.onServerUpdate((update) =>
broadcast({
type: 'server_update',
update,
}),
)
}
}

View file

@ -1,8 +1,13 @@
import { createRequire } from 'module'
import { createInterface, Interface as RlInterface } from 'readline'
import { FileDownloadLocation, FileDownloadParameters, User } from '@mtcute/core'
import { TelegramClient as TelegramClientBase, TelegramClientOptions } from '@mtcute/core/client.js'
import { FileDownloadLocation, FileDownloadParameters, ITelegramStorageProvider, PartialOnly, User } from '@mtcute/core'
import {
BaseTelegramClient as BaseTelegramClientBase,
BaseTelegramClientOptions as BaseTelegramClientOptionsBase,
TelegramClient as TelegramClientBase,
TelegramClientOptions,
} from '@mtcute/core/client.js'
import { setPlatform } from '@mtcute/core/platform.js'
import { SqliteStorage } from '@mtcute/sqlite'
@ -25,19 +30,23 @@ try {
nativeCrypto = require('@mtcute/crypto-node').NodeNativeCryptoProvider
} catch (e) {}
/**
* Telegram client for use in Node.js
export interface BaseTelegramClientOptions
extends PartialOnly<Omit<BaseTelegramClientOptionsBase, 'storage'>, 'transport' | 'crypto'> {
/**
* Storage to use for this client.
*
* If a string is passed, it will be used as
* a name for an SQLite database file.
*
* @default `"client.session"`
*/
export class TelegramClient extends TelegramClientBase {
constructor(opts: TelegramClientOptions) {
storage?: string | ITelegramStorageProvider
}
export class BaseTelegramClient extends BaseTelegramClientBase {
constructor(opts: BaseTelegramClientOptions) {
setPlatform(new NodePlatform())
if ('client' in opts) {
super(opts)
return
}
super({
// eslint-disable-next-line
crypto: nativeCrypto ? new nativeCrypto() : new NodeCryptoProvider(),
@ -49,6 +58,23 @@ export class TelegramClient extends TelegramClientBase {
opts.storage ?? new SqliteStorage('client.session'),
})
}
}
/**
* Telegram client for use in Node.js
*/
export class TelegramClient extends TelegramClientBase {
constructor(opts: TelegramClientOptions) {
if ('client' in opts) {
super(opts)
return
}
super({
client: new BaseTelegramClient(opts),
})
}
private _rl?: RlInterface
@ -120,10 +146,7 @@ export class TelegramClient extends TelegramClientBase {
return downloadToFile(this, filename, location, params)
}
downloadAsNodeStream(
location: FileDownloadLocation,
params?: FileDownloadParameters | undefined,
) {
downloadAsNodeStream(location: FileDownloadLocation, params?: FileDownloadParameters | undefined) {
return downloadAsNodeStream(this, location, params)
}
}

View file

@ -2,6 +2,7 @@ export * from './client.js'
export * from './platform.js'
export * from './utils/crypto.js'
export * from './utils/tcp.js'
export * from './worker.js'
export * from '@mtcute/core'
export * from '@mtcute/html-parser'
export * from '@mtcute/markdown-parser'

View file

@ -0,0 +1,67 @@
import { parentPort, Worker } from 'worker_threads'
import { setPlatform } from '@mtcute/core/platform.js'
import {
ClientMessageHandler,
RespondFn,
SendFn,
SomeWorker,
TelegramWorker as TelegramWorkerBase,
TelegramWorkerOptions,
TelegramWorkerPort as TelegramWorkerPortBase,
TelegramWorkerPortOptions,
WorkerCustomMethods,
WorkerMessageHandler,
} from '@mtcute/core/worker.js'
import { NodePlatform } from './platform.js'
export type { TelegramWorkerOptions, TelegramWorkerPortOptions, WorkerCustomMethods }
let _registered = false
export class TelegramWorker<T extends WorkerCustomMethods> extends TelegramWorkerBase<T> {
registerWorker(handler: WorkerMessageHandler): RespondFn {
if (!parentPort) {
throw new Error('TelegramWorker must be created from a worker thread')
}
if (_registered) {
throw new Error('TelegramWorker must be created only once')
}
_registered = true
const port = parentPort
const respond: RespondFn = port.postMessage.bind(port)
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
parentPort.on('message', (message) => handler(message, respond))
return respond
}
}
export class TelegramWorkerPort<T extends WorkerCustomMethods> extends TelegramWorkerPortBase<T> {
constructor(readonly options: TelegramWorkerPortOptions) {
setPlatform(new NodePlatform())
super(options)
}
connectToWorker(worker: SomeWorker, handler: ClientMessageHandler): [SendFn, () => void] {
if (!(worker instanceof Worker)) {
throw new Error('Only worker_threads are supported')
}
const send: SendFn = worker.postMessage.bind(worker)
worker.on('message', handler)
return [
send,
() => {
worker.off('message', handler)
},
]
}
}

View file

@ -1,11 +1,5 @@
import {
BaseTelegramClient,
BaseTelegramClientOptions,
MaybePromise,
MustEqual,
RpcCallOptions,
tl,
} from '@mtcute/core'
import { MaybePromise, MustEqual, RpcCallOptions, tl } from '@mtcute/core'
import { BaseTelegramClient, BaseTelegramClientOptions } from '@mtcute/core/client.js'
import { defaultCryptoProvider } from './platform.js'
import { StubMemoryTelegramStorage } from './storage.js'

View file

@ -1,6 +1,7 @@
import { describe, expect, it, vi } from 'vitest'
import { BaseTelegramClient, MemoryStorage } from '@mtcute/core'
import { MemoryStorage } from '@mtcute/core'
import { BaseTelegramClient } from '@mtcute/core/client.js'
import { defaultCryptoProvider } from './platform.js'
import { createStub } from './stub.js'

View file

@ -0,0 +1,61 @@
import { ITelegramStorageProvider, PartialOnly } from '@mtcute/core'
import {
BaseTelegramClient as BaseTelegramClientBase,
BaseTelegramClientOptions as BaseTelegramClientOptionsBase,
TelegramClient as TelegramClientBase,
TelegramClientOptions,
} from '@mtcute/core/client.js'
import { setPlatform } from '@mtcute/core/platform.js'
import { WebCryptoProvider } from './crypto.js'
import { IdbStorage } from './idb/index.js'
import { WebPlatform } from './platform.js'
import { WebSocketTransport } from './websocket.js'
export type { TelegramClientOptions }
export interface BaseTelegramClientOptions
extends PartialOnly<Omit<BaseTelegramClientOptionsBase, 'storage'>, 'transport' | 'crypto'> {
/**
* Storage to use for this client.
*
* If a string is passed, it will be used as
* a name for an IndexedDB database.
*
* @default `"client.session"`
*/
storage?: string | ITelegramStorageProvider
}
export class BaseTelegramClient extends BaseTelegramClientBase {
constructor(opts: BaseTelegramClientOptions) {
setPlatform(new WebPlatform())
super({
crypto: new WebCryptoProvider(),
transport: () => new WebSocketTransport(),
...opts,
storage:
typeof opts.storage === 'string' ?
new IdbStorage(opts.storage) :
opts.storage ?? new IdbStorage('client.session'),
})
}
}
/**
* Telegram client for use in Node.js
*/
export class TelegramClient extends TelegramClientBase {
constructor(opts: TelegramClientOptions) {
if ('client' in opts) {
super(opts)
return
}
super({
client: new BaseTelegramClient(opts),
})
}
}

View file

@ -1,3 +1,6 @@
export * from './client.js'
export * from './crypto.js'
export * from './idb/index.js'
export * from './platform.js'
export * from './worker.js'
export * from '@mtcute/core'

View file

@ -0,0 +1,67 @@
import { parentPort, Worker } from 'worker_threads'
import { setPlatform } from '@mtcute/core/platform.js'
import {
ClientMessageHandler,
RespondFn,
SendFn,
SomeWorker,
TelegramWorker as TelegramWorkerBase,
TelegramWorkerOptions,
TelegramWorkerPort as TelegramWorkerPortBase,
TelegramWorkerPortOptions,
WorkerCustomMethods,
WorkerMessageHandler,
} from '@mtcute/core/worker.js'
import { WebPlatform } from './platform.js'
export type { TelegramWorkerOptions, TelegramWorkerPortOptions, WorkerCustomMethods }
let _registered = false
export class TelegramWorker<T extends WorkerCustomMethods> extends TelegramWorkerBase<T> {
registerWorker(handler: WorkerMessageHandler): RespondFn {
if (!parentPort) {
throw new Error('TelegramWorker must be created from a worker thread')
}
if (_registered) {
throw new Error('TelegramWorker must be created only once')
}
_registered = true
const port = parentPort
const respond: RespondFn = port.postMessage.bind(port)
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
parentPort.on('message', (message) => handler(message, respond))
return respond
}
}
export class TelegramWorkerPort<T extends WorkerCustomMethods> extends TelegramWorkerPortBase<T> {
constructor(readonly options: TelegramWorkerPortOptions) {
setPlatform(new WebPlatform())
super(options)
}
connectToWorker(worker: SomeWorker, handler: ClientMessageHandler): [SendFn, () => void] {
if (!(worker instanceof Worker)) {
throw new Error('Only worker_threads are supported')
}
const send: SendFn = worker.postMessage.bind(worker)
worker.on('message', handler)
return [
send,
() => {
worker.off('message', handler)
},
]
}
}