From 134dc8371fd922aea70515702f4885ee7c873ba9 Mon Sep 17 00:00:00 2001 From: alina sireneva Date: Mon, 15 Jul 2024 18:43:35 +0300 Subject: [PATCH] fix(core): serializing Longs between worker and port --- e2e/deno/tests/05.worker.ts | 9 ++- e2e/node/ts/tests/05.worker.ts | 10 ++- packages/core/src/highlevel/worker/invoker.ts | 4 +- packages/core/src/highlevel/worker/port.ts | 8 +- .../core/src/highlevel/worker/protocol.ts | 76 +++++++++++++++++-- packages/core/src/highlevel/worker/worker.ts | 18 +++-- 6 files changed, 104 insertions(+), 21 deletions(-) diff --git a/e2e/deno/tests/05.worker.ts b/e2e/deno/tests/05.worker.ts index fe8ea4ce..d50bcf41 100644 --- a/e2e/deno/tests/05.worker.ts +++ b/e2e/deno/tests/05.worker.ts @@ -1,7 +1,7 @@ import { assertEquals, assertGreater, assertInstanceOf } from 'https://deno.land/std@0.223.0/assert/mod.ts' import { TelegramClient } from '@mtcute/core/client.js' -import { Message, TelegramWorkerPort, tl } from '@mtcute/deno' +import { Long, Message, TelegramWorkerPort, tl } from '@mtcute/deno' import { getApiParams, waitFor } from '../utils.ts' import type { CustomMethods } from './_worker.ts' @@ -18,8 +18,11 @@ Deno.test('5. worker', { sanitizeResources: false }, async (t) => { await t.step('should make api calls', async function () { const res = await port.call({ _: 'help.getConfig' }) - assertEquals(res._, 'config') + + const premiumPromo = await port.call({ _: 'help.getPremiumPromo' }) + // ensure Long-s are correctly serialized + assertEquals(Long.isLong((premiumPromo.users[0] as tl.RawUser).accessHash), true) }) await t.step('should call custom methods', async function () { @@ -47,6 +50,8 @@ Deno.test('5. worker', { sanitizeResources: false }, async (t) => { await port.startUpdatesLoop() const me = await portClient.getMe() + // ensure Long-s are correctly serialized + assertEquals(Long.isLong(me.raw.accessHash), true) let username = me.username if (!username) { diff --git a/e2e/node/ts/tests/05.worker.ts b/e2e/node/ts/tests/05.worker.ts index 5dcd33b4..b00cd4f8 100644 --- a/e2e/node/ts/tests/05.worker.ts +++ b/e2e/node/ts/tests/05.worker.ts @@ -5,7 +5,7 @@ import path from 'path' import { Worker } from 'worker_threads' import { TelegramClient } from '@mtcute/core/client.js' -import { Message, TelegramWorkerPort, tl } from '@mtcute/node' +import { Long, Message, TelegramWorkerPort, tl } from '@mtcute/node' import { getApiParams, waitFor } from '../utils.js' import type { CustomMethods } from './_worker.js' @@ -22,8 +22,11 @@ describe('5. worker', async function () { it('should make api calls', async function () { const res = await port.call({ _: 'help.getConfig' }) - expect(res._).to.equal('config') + + const premiumPromo = await port.call({ _: 'help.getPremiumPromo' }) + // ensure Long-s are correctly serialized + expect(Long.isLong((premiumPromo.users[0] as tl.RawUser).accessHash)).to.equal(true) }) it('should call custom methods', async function () { @@ -51,6 +54,9 @@ describe('5. worker', async function () { await port.startUpdatesLoop() const me = await portClient.getMe() + // ensure Long-s are correctly serialized + expect(Long.isLong(me.raw.accessHash)).equals(true) + let username = me.username if (!username) { diff --git a/packages/core/src/highlevel/worker/invoker.ts b/packages/core/src/highlevel/worker/invoker.ts index 9a5ecffb..4c1a08b9 100644 --- a/packages/core/src/highlevel/worker/invoker.ts +++ b/packages/core/src/highlevel/worker/invoker.ts @@ -1,6 +1,6 @@ import { ControllablePromise, createControllablePromise } from '../../utils/controllable-promise.js' import { deserializeError } from './errors.js' -import { SendFn, WorkerInboundMessage, WorkerOutboundMessage } from './protocol.js' +import { deserializeResult, SendFn, WorkerInboundMessage, WorkerOutboundMessage } from './protocol.js' export type InvokeTarget = Extract['target'] @@ -61,7 +61,7 @@ export class WorkerInvoker { if (msg.error) { promise.reject(deserializeError(msg.error)) } else { - promise.resolve(msg.result) + promise.resolve(deserializeResult(msg.result!)) } } diff --git a/packages/core/src/highlevel/worker/port.ts b/packages/core/src/highlevel/worker/port.ts index 116aff98..61637adb 100644 --- a/packages/core/src/highlevel/worker/port.ts +++ b/packages/core/src/highlevel/worker/port.ts @@ -8,7 +8,7 @@ import { PeersIndex } from '../types/peers/peers-index.js' import { RawUpdateHandler } from '../updates/types.js' import { AppConfigManagerProxy } from './app-config.js' import { WorkerInvoker } from './invoker.js' -import { ClientMessageHandler, SendFn, SomeWorker, WorkerCustomMethods } from './protocol.js' +import { ClientMessageHandler, deserializeResult, SendFn, SomeWorker, WorkerCustomMethods } from './protocol.js' import { TelegramStorageProxy } from './storage.js' export interface TelegramWorkerPortOptions { @@ -129,15 +129,15 @@ export abstract class TelegramWorkerPort imp this.log.handler(message.color, message.level, message.tag, message.fmt, message.args) break case 'server_update': - this._serverUpdatesHandler(message.update) + this._serverUpdatesHandler(deserializeResult(message.update)) break case 'conn_state': this._connectionStateHandler(message.state) break case 'update': { - const peers = new PeersIndex(message.users, message.chats) + const peers = new PeersIndex(deserializeResult(message.users), deserializeResult(message.chats)) peers.hasMin = message.hasMin - this._updateHandler(message.update, peers) + this._updateHandler(deserializeResult(message.update), peers) break } case 'result': diff --git a/packages/core/src/highlevel/worker/protocol.ts b/packages/core/src/highlevel/worker/protocol.ts index 5837959a..345e33d3 100644 --- a/packages/core/src/highlevel/worker/protocol.ts +++ b/packages/core/src/highlevel/worker/protocol.ts @@ -1,3 +1,4 @@ +import Long from 'long' import type { Worker as NodeWorker } from 'node:worker_threads' import { tl } from '@mtcute/tl' @@ -21,12 +22,12 @@ export type WorkerInboundMessage = } export type WorkerOutboundMessage = - | { type: 'server_update'; update: tl.TypeUpdates } + | { type: 'server_update'; update: SerializedResult } | { type: 'update' - update: tl.TypeUpdate - users: Map - chats: Map + update: SerializedResult + users: SerializedResult> + chats: SerializedResult> hasMin: boolean } | { type: 'error'; error: unknown } @@ -43,7 +44,7 @@ export type WorkerOutboundMessage = | { type: 'result' id: number - result?: unknown + result?: SerializedResult error?: SerializedError } @@ -57,3 +58,68 @@ export type WorkerMessageHandler = (message: WorkerInboundMessage, respond: Resp // eslint-disable-next-line @typescript-eslint/no-explicit-any export type WorkerCustomMethods = Record Promise> + +export type SerializedResult = { __serialized__: T } + +export function serializeResult(result: T): SerializedResult { + if (Array.isArray(result)) { + return result.map(serializeResult) as unknown as SerializedResult + } + + if (result instanceof Map) { + for (const [key, value] of result.entries()) { + result.set(key, serializeResult(value)) + } + + return result as unknown as SerializedResult + } + + if (result && typeof result === 'object') { + // replace Long instances with a special object + for (const [key, value] of Object.entries(result)) { + if (Long.isLong(value)) { + // eslint-disable-next-line + ;(result as any)[key] = { + __type: 'long', + low: value.low, + high: value.high, + unsigned: value.unsigned, + } + } else { + // eslint-disable-next-line + ;(result as any)[key] = serializeResult(value) + } + } + } + + return result as unknown as SerializedResult +} + +export function deserializeResult(result: SerializedResult): T { + if (Array.isArray(result)) { + return result.map(deserializeResult) as unknown as T + } + + if (result instanceof Map) { + for (const [key, value] of result.entries()) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + result.set(key, deserializeResult(value)) + } + + return result as unknown as T + } + + if (result && typeof result === 'object') { + for (const [key, value] of Object.entries(result)) { + if (value && typeof value === 'object' && (value as Record).__type === 'long') { + // eslint-disable-next-line + ;(result as any)[key] = Long.fromValue(value as unknown as Long) + } else { + // eslint-disable-next-line + ;(result as any)[key] = deserializeResult(value as SerializedResult) + } + } + } + + return result as unknown as T +} diff --git a/packages/core/src/highlevel/worker/worker.ts b/packages/core/src/highlevel/worker/worker.ts index 404786d4..7d8ea5be 100644 --- a/packages/core/src/highlevel/worker/worker.ts +++ b/packages/core/src/highlevel/worker/worker.ts @@ -1,6 +1,12 @@ import { BaseTelegramClient } from '../base.js' import { serializeError } from './errors.js' -import { RespondFn, WorkerCustomMethods, WorkerInboundMessage, WorkerMessageHandler } from './protocol.js' +import { + RespondFn, + serializeResult, + WorkerCustomMethods, + WorkerInboundMessage, + WorkerMessageHandler, +} from './protocol.js' export interface TelegramWorkerOptions { client: BaseTelegramClient @@ -62,9 +68,9 @@ export abstract class TelegramWorker { client.onUpdate((update, peers) => this.broadcast({ type: 'update', - update, - users: peers.users, - chats: peers.chats, + update: serializeResult(update), + users: serializeResult(peers.users), + chats: serializeResult(peers.chats), hasMin: peers.hasMin, }), ) @@ -72,7 +78,7 @@ export abstract class TelegramWorker { client.onServerUpdate((update) => this.broadcast({ type: 'server_update', - update, + update: serializeResult(update), }), ) } @@ -153,7 +159,7 @@ export abstract class TelegramWorker { respond({ type: 'result', id: msg.id, - result: res, + result: serializeResult(res), }) }) .catch((err) => {