fix(core): serializing Longs between worker and port #63

Merged
teidesu merged 1 commit from worker-longs into master 2024-07-15 23:30:40 +03:00
6 changed files with 104 additions and 21 deletions

View file

@ -1,7 +1,7 @@
import { assertEquals, assertGreater, assertInstanceOf } from 'https://deno.land/std@0.223.0/assert/mod.ts' import { assertEquals, assertGreater, assertInstanceOf } from 'https://deno.land/std@0.223.0/assert/mod.ts'
import { TelegramClient } from '@mtcute/core/client.js' import { TelegramClient } from '@mtcute/core/client.js'
import { Message, TelegramWorkerPort, tl } from '@mtcute/deno' import { Long, Message, TelegramWorkerPort, tl } from '@mtcute/deno'
import { getApiParams, waitFor } from '../utils.ts' import { getApiParams, waitFor } from '../utils.ts'
import type { CustomMethods } from './_worker.ts' import type { CustomMethods } from './_worker.ts'
@ -18,8 +18,11 @@ Deno.test('5. worker', { sanitizeResources: false }, async (t) => {
await t.step('should make api calls', async function () { await t.step('should make api calls', async function () {
const res = await port.call({ _: 'help.getConfig' }) const res = await port.call({ _: 'help.getConfig' })
assertEquals(res._, 'config') assertEquals(res._, 'config')
const premiumPromo = await port.call({ _: 'help.getPremiumPromo' })
// ensure Long-s are correctly serialized
assertEquals(Long.isLong((premiumPromo.users[0] as tl.RawUser).accessHash), true)
}) })
await t.step('should call custom methods', async function () { await t.step('should call custom methods', async function () {
@ -47,6 +50,8 @@ Deno.test('5. worker', { sanitizeResources: false }, async (t) => {
await port.startUpdatesLoop() await port.startUpdatesLoop()
const me = await portClient.getMe() const me = await portClient.getMe()
// ensure Long-s are correctly serialized
assertEquals(Long.isLong(me.raw.accessHash), true)
let username = me.username let username = me.username
if (!username) { if (!username) {

View file

@ -5,7 +5,7 @@ import path from 'path'
import { Worker } from 'worker_threads' import { Worker } from 'worker_threads'
import { TelegramClient } from '@mtcute/core/client.js' import { TelegramClient } from '@mtcute/core/client.js'
import { Message, TelegramWorkerPort, tl } from '@mtcute/node' import { Long, Message, TelegramWorkerPort, tl } from '@mtcute/node'
import { getApiParams, waitFor } from '../utils.js' import { getApiParams, waitFor } from '../utils.js'
import type { CustomMethods } from './_worker.js' import type { CustomMethods } from './_worker.js'
@ -22,8 +22,11 @@ describe('5. worker', async function () {
it('should make api calls', async function () { it('should make api calls', async function () {
const res = await port.call({ _: 'help.getConfig' }) const res = await port.call({ _: 'help.getConfig' })
expect(res._).to.equal('config') expect(res._).to.equal('config')
const premiumPromo = await port.call({ _: 'help.getPremiumPromo' })
// ensure Long-s are correctly serialized
expect(Long.isLong((premiumPromo.users[0] as tl.RawUser).accessHash)).to.equal(true)
}) })
it('should call custom methods', async function () { it('should call custom methods', async function () {
@ -51,6 +54,9 @@ describe('5. worker', async function () {
await port.startUpdatesLoop() await port.startUpdatesLoop()
const me = await portClient.getMe() const me = await portClient.getMe()
// ensure Long-s are correctly serialized
expect(Long.isLong(me.raw.accessHash)).equals(true)
let username = me.username let username = me.username
if (!username) { if (!username) {

View file

@ -1,6 +1,6 @@
import { ControllablePromise, createControllablePromise } from '../../utils/controllable-promise.js' import { ControllablePromise, createControllablePromise } from '../../utils/controllable-promise.js'
import { deserializeError } from './errors.js' import { deserializeError } from './errors.js'
import { SendFn, WorkerInboundMessage, WorkerOutboundMessage } from './protocol.js' import { deserializeResult, SendFn, WorkerInboundMessage, WorkerOutboundMessage } from './protocol.js'
export type InvokeTarget = Extract<WorkerInboundMessage, { type: 'invoke' }>['target'] export type InvokeTarget = Extract<WorkerInboundMessage, { type: 'invoke' }>['target']
@ -61,7 +61,7 @@ export class WorkerInvoker {
if (msg.error) { if (msg.error) {
promise.reject(deserializeError(msg.error)) promise.reject(deserializeError(msg.error))
} else { } else {
promise.resolve(msg.result) promise.resolve(deserializeResult(msg.result!))
} }
} }

View file

@ -8,7 +8,7 @@ import { PeersIndex } from '../types/peers/peers-index.js'
import { RawUpdateHandler } from '../updates/types.js' import { RawUpdateHandler } from '../updates/types.js'
import { AppConfigManagerProxy } from './app-config.js' import { AppConfigManagerProxy } from './app-config.js'
import { WorkerInvoker } from './invoker.js' import { WorkerInvoker } from './invoker.js'
import { ClientMessageHandler, SendFn, SomeWorker, WorkerCustomMethods } from './protocol.js' import { ClientMessageHandler, deserializeResult, SendFn, SomeWorker, WorkerCustomMethods } from './protocol.js'
import { TelegramStorageProxy } from './storage.js' import { TelegramStorageProxy } from './storage.js'
export interface TelegramWorkerPortOptions { export interface TelegramWorkerPortOptions {
@ -129,15 +129,15 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
this.log.handler(message.color, message.level, message.tag, message.fmt, message.args) this.log.handler(message.color, message.level, message.tag, message.fmt, message.args)
break break
case 'server_update': case 'server_update':
this._serverUpdatesHandler(message.update) this._serverUpdatesHandler(deserializeResult(message.update))
break break
case 'conn_state': case 'conn_state':
this._connectionStateHandler(message.state) this._connectionStateHandler(message.state)
break break
case 'update': { case 'update': {
const peers = new PeersIndex(message.users, message.chats) const peers = new PeersIndex(deserializeResult(message.users), deserializeResult(message.chats))
peers.hasMin = message.hasMin peers.hasMin = message.hasMin
this._updateHandler(message.update, peers) this._updateHandler(deserializeResult(message.update), peers)
break break
} }
case 'result': case 'result':

View file

@ -1,3 +1,4 @@
import Long from 'long'
import type { Worker as NodeWorker } from 'node:worker_threads' import type { Worker as NodeWorker } from 'node:worker_threads'
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
@ -21,12 +22,12 @@ export type WorkerInboundMessage =
} }
export type WorkerOutboundMessage = export type WorkerOutboundMessage =
| { type: 'server_update'; update: tl.TypeUpdates } | { type: 'server_update'; update: SerializedResult<tl.TypeUpdates> }
| { | {
type: 'update' type: 'update'
update: tl.TypeUpdate update: SerializedResult<tl.TypeUpdate>
users: Map<number, tl.TypeUser> users: SerializedResult<Map<number, tl.TypeUser>>
chats: Map<number, tl.TypeChat> chats: SerializedResult<Map<number, tl.TypeChat>>
hasMin: boolean hasMin: boolean
} }
| { type: 'error'; error: unknown } | { type: 'error'; error: unknown }
@ -43,7 +44,7 @@ export type WorkerOutboundMessage =
| { | {
type: 'result' type: 'result'
id: number id: number
result?: unknown result?: SerializedResult<unknown>
error?: SerializedError error?: SerializedError
} }
@ -57,3 +58,68 @@ export type WorkerMessageHandler = (message: WorkerInboundMessage, respond: Resp
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
export type WorkerCustomMethods = Record<string, (...args: any[]) => Promise<any>> export type WorkerCustomMethods = Record<string, (...args: any[]) => Promise<any>>
export type SerializedResult<T> = { __serialized__: T }
export function serializeResult<T>(result: T): SerializedResult<T> {
if (Array.isArray(result)) {
return result.map(serializeResult) as unknown as SerializedResult<T>
}
if (result instanceof Map) {
for (const [key, value] of result.entries()) {
result.set(key, serializeResult(value))
}
return result as unknown as SerializedResult<T>
}
if (result && typeof result === 'object') {
// replace Long instances with a special object
for (const [key, value] of Object.entries(result)) {
if (Long.isLong(value)) {
// eslint-disable-next-line
;(result as any)[key] = {
__type: 'long',
low: value.low,
high: value.high,
unsigned: value.unsigned,
}
} else {
// eslint-disable-next-line
;(result as any)[key] = serializeResult(value)
}
}
}
return result as unknown as SerializedResult<T>
}
export function deserializeResult<T>(result: SerializedResult<T>): T {
if (Array.isArray(result)) {
return result.map(deserializeResult) as unknown as T
}
if (result instanceof Map) {
for (const [key, value] of result.entries()) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
result.set(key, deserializeResult(value))
}
return result as unknown as T
}
if (result && typeof result === 'object') {
for (const [key, value] of Object.entries(result)) {
if (value && typeof value === 'object' && (value as Record<string, string>).__type === 'long') {
// eslint-disable-next-line
;(result as any)[key] = Long.fromValue(value as unknown as Long)
} else {
// eslint-disable-next-line
;(result as any)[key] = deserializeResult(value as SerializedResult<unknown>)
}
}
}
return result as unknown as T
}

View file

@ -1,6 +1,12 @@
import { BaseTelegramClient } from '../base.js' import { BaseTelegramClient } from '../base.js'
import { serializeError } from './errors.js' import { serializeError } from './errors.js'
import { RespondFn, WorkerCustomMethods, WorkerInboundMessage, WorkerMessageHandler } from './protocol.js' import {
RespondFn,
serializeResult,
WorkerCustomMethods,
WorkerInboundMessage,
WorkerMessageHandler,
} from './protocol.js'
export interface TelegramWorkerOptions<T extends WorkerCustomMethods> { export interface TelegramWorkerOptions<T extends WorkerCustomMethods> {
client: BaseTelegramClient client: BaseTelegramClient
@ -62,9 +68,9 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
client.onUpdate((update, peers) => client.onUpdate((update, peers) =>
this.broadcast({ this.broadcast({
type: 'update', type: 'update',
update, update: serializeResult(update),
users: peers.users, users: serializeResult(peers.users),
chats: peers.chats, chats: serializeResult(peers.chats),
hasMin: peers.hasMin, hasMin: peers.hasMin,
}), }),
) )
@ -72,7 +78,7 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
client.onServerUpdate((update) => client.onServerUpdate((update) =>
this.broadcast({ this.broadcast({
type: 'server_update', type: 'server_update',
update, update: serializeResult(update),
}), }),
) )
} }
@ -153,7 +159,7 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
respond({ respond({
type: 'result', type: 'result',
id: msg.id, id: msg.id,
result: res, result: serializeResult(res),
}) })
}) })
.catch((err) => { .catch((err) => {