fix(core): serializing Longs between worker and port #63
6 changed files with 104 additions and 21 deletions
|
@ -1,7 +1,7 @@
|
|||
import { assertEquals, assertGreater, assertInstanceOf } from 'https://deno.land/std@0.223.0/assert/mod.ts'
|
||||
|
||||
import { TelegramClient } from '@mtcute/core/client.js'
|
||||
import { Message, TelegramWorkerPort, tl } from '@mtcute/deno'
|
||||
import { Long, Message, TelegramWorkerPort, tl } from '@mtcute/deno'
|
||||
|
||||
import { getApiParams, waitFor } from '../utils.ts'
|
||||
import type { CustomMethods } from './_worker.ts'
|
||||
|
@ -18,8 +18,11 @@ Deno.test('5. worker', { sanitizeResources: false }, async (t) => {
|
|||
|
||||
await t.step('should make api calls', async function () {
|
||||
const res = await port.call({ _: 'help.getConfig' })
|
||||
|
||||
assertEquals(res._, 'config')
|
||||
|
||||
const premiumPromo = await port.call({ _: 'help.getPremiumPromo' })
|
||||
// ensure Long-s are correctly serialized
|
||||
assertEquals(Long.isLong((premiumPromo.users[0] as tl.RawUser).accessHash), true)
|
||||
})
|
||||
|
||||
await t.step('should call custom methods', async function () {
|
||||
|
@ -47,6 +50,8 @@ Deno.test('5. worker', { sanitizeResources: false }, async (t) => {
|
|||
await port.startUpdatesLoop()
|
||||
|
||||
const me = await portClient.getMe()
|
||||
// ensure Long-s are correctly serialized
|
||||
assertEquals(Long.isLong(me.raw.accessHash), true)
|
||||
let username = me.username
|
||||
|
||||
if (!username) {
|
||||
|
|
|
@ -5,7 +5,7 @@ import path from 'path'
|
|||
import { Worker } from 'worker_threads'
|
||||
|
||||
import { TelegramClient } from '@mtcute/core/client.js'
|
||||
import { Message, TelegramWorkerPort, tl } from '@mtcute/node'
|
||||
import { Long, Message, TelegramWorkerPort, tl } from '@mtcute/node'
|
||||
|
||||
import { getApiParams, waitFor } from '../utils.js'
|
||||
import type { CustomMethods } from './_worker.js'
|
||||
|
@ -22,8 +22,11 @@ describe('5. worker', async function () {
|
|||
|
||||
it('should make api calls', async function () {
|
||||
const res = await port.call({ _: 'help.getConfig' })
|
||||
|
||||
expect(res._).to.equal('config')
|
||||
|
||||
const premiumPromo = await port.call({ _: 'help.getPremiumPromo' })
|
||||
// ensure Long-s are correctly serialized
|
||||
expect(Long.isLong((premiumPromo.users[0] as tl.RawUser).accessHash)).to.equal(true)
|
||||
})
|
||||
|
||||
it('should call custom methods', async function () {
|
||||
|
@ -51,6 +54,9 @@ describe('5. worker', async function () {
|
|||
await port.startUpdatesLoop()
|
||||
|
||||
const me = await portClient.getMe()
|
||||
// ensure Long-s are correctly serialized
|
||||
expect(Long.isLong(me.raw.accessHash)).equals(true)
|
||||
|
||||
let username = me.username
|
||||
|
||||
if (!username) {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import { ControllablePromise, createControllablePromise } from '../../utils/controllable-promise.js'
|
||||
import { deserializeError } from './errors.js'
|
||||
import { SendFn, WorkerInboundMessage, WorkerOutboundMessage } from './protocol.js'
|
||||
import { deserializeResult, SendFn, WorkerInboundMessage, WorkerOutboundMessage } from './protocol.js'
|
||||
|
||||
export type InvokeTarget = Extract<WorkerInboundMessage, { type: 'invoke' }>['target']
|
||||
|
||||
|
@ -61,7 +61,7 @@ export class WorkerInvoker {
|
|||
if (msg.error) {
|
||||
promise.reject(deserializeError(msg.error))
|
||||
} else {
|
||||
promise.resolve(msg.result)
|
||||
promise.resolve(deserializeResult(msg.result!))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ import { PeersIndex } from '../types/peers/peers-index.js'
|
|||
import { RawUpdateHandler } from '../updates/types.js'
|
||||
import { AppConfigManagerProxy } from './app-config.js'
|
||||
import { WorkerInvoker } from './invoker.js'
|
||||
import { ClientMessageHandler, SendFn, SomeWorker, WorkerCustomMethods } from './protocol.js'
|
||||
import { ClientMessageHandler, deserializeResult, SendFn, SomeWorker, WorkerCustomMethods } from './protocol.js'
|
||||
import { TelegramStorageProxy } from './storage.js'
|
||||
|
||||
export interface TelegramWorkerPortOptions {
|
||||
|
@ -129,15 +129,15 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
|
|||
this.log.handler(message.color, message.level, message.tag, message.fmt, message.args)
|
||||
break
|
||||
case 'server_update':
|
||||
this._serverUpdatesHandler(message.update)
|
||||
this._serverUpdatesHandler(deserializeResult(message.update))
|
||||
break
|
||||
case 'conn_state':
|
||||
this._connectionStateHandler(message.state)
|
||||
break
|
||||
case 'update': {
|
||||
const peers = new PeersIndex(message.users, message.chats)
|
||||
const peers = new PeersIndex(deserializeResult(message.users), deserializeResult(message.chats))
|
||||
peers.hasMin = message.hasMin
|
||||
this._updateHandler(message.update, peers)
|
||||
this._updateHandler(deserializeResult(message.update), peers)
|
||||
break
|
||||
}
|
||||
case 'result':
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import Long from 'long'
|
||||
import type { Worker as NodeWorker } from 'node:worker_threads'
|
||||
|
||||
import { tl } from '@mtcute/tl'
|
||||
|
@ -21,12 +22,12 @@ export type WorkerInboundMessage =
|
|||
}
|
||||
|
||||
export type WorkerOutboundMessage =
|
||||
| { type: 'server_update'; update: tl.TypeUpdates }
|
||||
| { type: 'server_update'; update: SerializedResult<tl.TypeUpdates> }
|
||||
| {
|
||||
type: 'update'
|
||||
update: tl.TypeUpdate
|
||||
users: Map<number, tl.TypeUser>
|
||||
chats: Map<number, tl.TypeChat>
|
||||
update: SerializedResult<tl.TypeUpdate>
|
||||
users: SerializedResult<Map<number, tl.TypeUser>>
|
||||
chats: SerializedResult<Map<number, tl.TypeChat>>
|
||||
hasMin: boolean
|
||||
}
|
||||
| { type: 'error'; error: unknown }
|
||||
|
@ -43,7 +44,7 @@ export type WorkerOutboundMessage =
|
|||
| {
|
||||
type: 'result'
|
||||
id: number
|
||||
result?: unknown
|
||||
result?: SerializedResult<unknown>
|
||||
error?: SerializedError
|
||||
}
|
||||
|
||||
|
@ -57,3 +58,68 @@ export type WorkerMessageHandler = (message: WorkerInboundMessage, respond: Resp
|
|||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
export type WorkerCustomMethods = Record<string, (...args: any[]) => Promise<any>>
|
||||
|
||||
export type SerializedResult<T> = { __serialized__: T }
|
||||
|
||||
export function serializeResult<T>(result: T): SerializedResult<T> {
|
||||
if (Array.isArray(result)) {
|
||||
return result.map(serializeResult) as unknown as SerializedResult<T>
|
||||
}
|
||||
|
||||
if (result instanceof Map) {
|
||||
for (const [key, value] of result.entries()) {
|
||||
result.set(key, serializeResult(value))
|
||||
}
|
||||
|
||||
return result as unknown as SerializedResult<T>
|
||||
}
|
||||
|
||||
if (result && typeof result === 'object') {
|
||||
// replace Long instances with a special object
|
||||
for (const [key, value] of Object.entries(result)) {
|
||||
if (Long.isLong(value)) {
|
||||
// eslint-disable-next-line
|
||||
;(result as any)[key] = {
|
||||
__type: 'long',
|
||||
low: value.low,
|
||||
high: value.high,
|
||||
unsigned: value.unsigned,
|
||||
}
|
||||
} else {
|
||||
// eslint-disable-next-line
|
||||
;(result as any)[key] = serializeResult(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result as unknown as SerializedResult<T>
|
||||
}
|
||||
|
||||
export function deserializeResult<T>(result: SerializedResult<T>): T {
|
||||
if (Array.isArray(result)) {
|
||||
return result.map(deserializeResult) as unknown as T
|
||||
}
|
||||
|
||||
if (result instanceof Map) {
|
||||
for (const [key, value] of result.entries()) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
||||
result.set(key, deserializeResult(value))
|
||||
}
|
||||
|
||||
return result as unknown as T
|
||||
}
|
||||
|
||||
if (result && typeof result === 'object') {
|
||||
for (const [key, value] of Object.entries(result)) {
|
||||
if (value && typeof value === 'object' && (value as Record<string, string>).__type === 'long') {
|
||||
// eslint-disable-next-line
|
||||
;(result as any)[key] = Long.fromValue(value as unknown as Long)
|
||||
} else {
|
||||
// eslint-disable-next-line
|
||||
;(result as any)[key] = deserializeResult(value as SerializedResult<unknown>)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result as unknown as T
|
||||
}
|
||||
|
|
|
@ -1,6 +1,12 @@
|
|||
import { BaseTelegramClient } from '../base.js'
|
||||
import { serializeError } from './errors.js'
|
||||
import { RespondFn, WorkerCustomMethods, WorkerInboundMessage, WorkerMessageHandler } from './protocol.js'
|
||||
import {
|
||||
RespondFn,
|
||||
serializeResult,
|
||||
WorkerCustomMethods,
|
||||
WorkerInboundMessage,
|
||||
WorkerMessageHandler,
|
||||
} from './protocol.js'
|
||||
|
||||
export interface TelegramWorkerOptions<T extends WorkerCustomMethods> {
|
||||
client: BaseTelegramClient
|
||||
|
@ -62,9 +68,9 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
|
|||
client.onUpdate((update, peers) =>
|
||||
this.broadcast({
|
||||
type: 'update',
|
||||
update,
|
||||
users: peers.users,
|
||||
chats: peers.chats,
|
||||
update: serializeResult(update),
|
||||
users: serializeResult(peers.users),
|
||||
chats: serializeResult(peers.chats),
|
||||
hasMin: peers.hasMin,
|
||||
}),
|
||||
)
|
||||
|
@ -72,7 +78,7 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
|
|||
client.onServerUpdate((update) =>
|
||||
this.broadcast({
|
||||
type: 'server_update',
|
||||
update,
|
||||
update: serializeResult(update),
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
@ -153,7 +159,7 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
|
|||
respond({
|
||||
type: 'result',
|
||||
id: msg.id,
|
||||
result: res,
|
||||
result: serializeResult(res),
|
||||
})
|
||||
})
|
||||
.catch((err) => {
|
||||
|
|
Loading…
Reference in a new issue