fix(core): serializing Longs between worker and port #63
6 changed files with 104 additions and 21 deletions
|
@ -1,7 +1,7 @@
|
||||||
import { assertEquals, assertGreater, assertInstanceOf } from 'https://deno.land/std@0.223.0/assert/mod.ts'
|
import { assertEquals, assertGreater, assertInstanceOf } from 'https://deno.land/std@0.223.0/assert/mod.ts'
|
||||||
|
|
||||||
import { TelegramClient } from '@mtcute/core/client.js'
|
import { TelegramClient } from '@mtcute/core/client.js'
|
||||||
import { Message, TelegramWorkerPort, tl } from '@mtcute/deno'
|
import { Long, Message, TelegramWorkerPort, tl } from '@mtcute/deno'
|
||||||
|
|
||||||
import { getApiParams, waitFor } from '../utils.ts'
|
import { getApiParams, waitFor } from '../utils.ts'
|
||||||
import type { CustomMethods } from './_worker.ts'
|
import type { CustomMethods } from './_worker.ts'
|
||||||
|
@ -18,8 +18,11 @@ Deno.test('5. worker', { sanitizeResources: false }, async (t) => {
|
||||||
|
|
||||||
await t.step('should make api calls', async function () {
|
await t.step('should make api calls', async function () {
|
||||||
const res = await port.call({ _: 'help.getConfig' })
|
const res = await port.call({ _: 'help.getConfig' })
|
||||||
|
|
||||||
assertEquals(res._, 'config')
|
assertEquals(res._, 'config')
|
||||||
|
|
||||||
|
const premiumPromo = await port.call({ _: 'help.getPremiumPromo' })
|
||||||
|
// ensure Long-s are correctly serialized
|
||||||
|
assertEquals(Long.isLong((premiumPromo.users[0] as tl.RawUser).accessHash), true)
|
||||||
})
|
})
|
||||||
|
|
||||||
await t.step('should call custom methods', async function () {
|
await t.step('should call custom methods', async function () {
|
||||||
|
@ -47,6 +50,8 @@ Deno.test('5. worker', { sanitizeResources: false }, async (t) => {
|
||||||
await port.startUpdatesLoop()
|
await port.startUpdatesLoop()
|
||||||
|
|
||||||
const me = await portClient.getMe()
|
const me = await portClient.getMe()
|
||||||
|
// ensure Long-s are correctly serialized
|
||||||
|
assertEquals(Long.isLong(me.raw.accessHash), true)
|
||||||
let username = me.username
|
let username = me.username
|
||||||
|
|
||||||
if (!username) {
|
if (!username) {
|
||||||
|
|
|
@ -5,7 +5,7 @@ import path from 'path'
|
||||||
import { Worker } from 'worker_threads'
|
import { Worker } from 'worker_threads'
|
||||||
|
|
||||||
import { TelegramClient } from '@mtcute/core/client.js'
|
import { TelegramClient } from '@mtcute/core/client.js'
|
||||||
import { Message, TelegramWorkerPort, tl } from '@mtcute/node'
|
import { Long, Message, TelegramWorkerPort, tl } from '@mtcute/node'
|
||||||
|
|
||||||
import { getApiParams, waitFor } from '../utils.js'
|
import { getApiParams, waitFor } from '../utils.js'
|
||||||
import type { CustomMethods } from './_worker.js'
|
import type { CustomMethods } from './_worker.js'
|
||||||
|
@ -22,8 +22,11 @@ describe('5. worker', async function () {
|
||||||
|
|
||||||
it('should make api calls', async function () {
|
it('should make api calls', async function () {
|
||||||
const res = await port.call({ _: 'help.getConfig' })
|
const res = await port.call({ _: 'help.getConfig' })
|
||||||
|
|
||||||
expect(res._).to.equal('config')
|
expect(res._).to.equal('config')
|
||||||
|
|
||||||
|
const premiumPromo = await port.call({ _: 'help.getPremiumPromo' })
|
||||||
|
// ensure Long-s are correctly serialized
|
||||||
|
expect(Long.isLong((premiumPromo.users[0] as tl.RawUser).accessHash)).to.equal(true)
|
||||||
})
|
})
|
||||||
|
|
||||||
it('should call custom methods', async function () {
|
it('should call custom methods', async function () {
|
||||||
|
@ -51,6 +54,9 @@ describe('5. worker', async function () {
|
||||||
await port.startUpdatesLoop()
|
await port.startUpdatesLoop()
|
||||||
|
|
||||||
const me = await portClient.getMe()
|
const me = await portClient.getMe()
|
||||||
|
// ensure Long-s are correctly serialized
|
||||||
|
expect(Long.isLong(me.raw.accessHash)).equals(true)
|
||||||
|
|
||||||
let username = me.username
|
let username = me.username
|
||||||
|
|
||||||
if (!username) {
|
if (!username) {
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import { ControllablePromise, createControllablePromise } from '../../utils/controllable-promise.js'
|
import { ControllablePromise, createControllablePromise } from '../../utils/controllable-promise.js'
|
||||||
import { deserializeError } from './errors.js'
|
import { deserializeError } from './errors.js'
|
||||||
import { SendFn, WorkerInboundMessage, WorkerOutboundMessage } from './protocol.js'
|
import { deserializeResult, SendFn, WorkerInboundMessage, WorkerOutboundMessage } from './protocol.js'
|
||||||
|
|
||||||
export type InvokeTarget = Extract<WorkerInboundMessage, { type: 'invoke' }>['target']
|
export type InvokeTarget = Extract<WorkerInboundMessage, { type: 'invoke' }>['target']
|
||||||
|
|
||||||
|
@ -61,7 +61,7 @@ export class WorkerInvoker {
|
||||||
if (msg.error) {
|
if (msg.error) {
|
||||||
promise.reject(deserializeError(msg.error))
|
promise.reject(deserializeError(msg.error))
|
||||||
} else {
|
} else {
|
||||||
promise.resolve(msg.result)
|
promise.resolve(deserializeResult(msg.result!))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ import { PeersIndex } from '../types/peers/peers-index.js'
|
||||||
import { RawUpdateHandler } from '../updates/types.js'
|
import { RawUpdateHandler } from '../updates/types.js'
|
||||||
import { AppConfigManagerProxy } from './app-config.js'
|
import { AppConfigManagerProxy } from './app-config.js'
|
||||||
import { WorkerInvoker } from './invoker.js'
|
import { WorkerInvoker } from './invoker.js'
|
||||||
import { ClientMessageHandler, SendFn, SomeWorker, WorkerCustomMethods } from './protocol.js'
|
import { ClientMessageHandler, deserializeResult, SendFn, SomeWorker, WorkerCustomMethods } from './protocol.js'
|
||||||
import { TelegramStorageProxy } from './storage.js'
|
import { TelegramStorageProxy } from './storage.js'
|
||||||
|
|
||||||
export interface TelegramWorkerPortOptions {
|
export interface TelegramWorkerPortOptions {
|
||||||
|
@ -129,15 +129,15 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
|
||||||
this.log.handler(message.color, message.level, message.tag, message.fmt, message.args)
|
this.log.handler(message.color, message.level, message.tag, message.fmt, message.args)
|
||||||
break
|
break
|
||||||
case 'server_update':
|
case 'server_update':
|
||||||
this._serverUpdatesHandler(message.update)
|
this._serverUpdatesHandler(deserializeResult(message.update))
|
||||||
break
|
break
|
||||||
case 'conn_state':
|
case 'conn_state':
|
||||||
this._connectionStateHandler(message.state)
|
this._connectionStateHandler(message.state)
|
||||||
break
|
break
|
||||||
case 'update': {
|
case 'update': {
|
||||||
const peers = new PeersIndex(message.users, message.chats)
|
const peers = new PeersIndex(deserializeResult(message.users), deserializeResult(message.chats))
|
||||||
peers.hasMin = message.hasMin
|
peers.hasMin = message.hasMin
|
||||||
this._updateHandler(message.update, peers)
|
this._updateHandler(deserializeResult(message.update), peers)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
case 'result':
|
case 'result':
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import Long from 'long'
|
||||||
import type { Worker as NodeWorker } from 'node:worker_threads'
|
import type { Worker as NodeWorker } from 'node:worker_threads'
|
||||||
|
|
||||||
import { tl } from '@mtcute/tl'
|
import { tl } from '@mtcute/tl'
|
||||||
|
@ -21,12 +22,12 @@ export type WorkerInboundMessage =
|
||||||
}
|
}
|
||||||
|
|
||||||
export type WorkerOutboundMessage =
|
export type WorkerOutboundMessage =
|
||||||
| { type: 'server_update'; update: tl.TypeUpdates }
|
| { type: 'server_update'; update: SerializedResult<tl.TypeUpdates> }
|
||||||
| {
|
| {
|
||||||
type: 'update'
|
type: 'update'
|
||||||
update: tl.TypeUpdate
|
update: SerializedResult<tl.TypeUpdate>
|
||||||
users: Map<number, tl.TypeUser>
|
users: SerializedResult<Map<number, tl.TypeUser>>
|
||||||
chats: Map<number, tl.TypeChat>
|
chats: SerializedResult<Map<number, tl.TypeChat>>
|
||||||
hasMin: boolean
|
hasMin: boolean
|
||||||
}
|
}
|
||||||
| { type: 'error'; error: unknown }
|
| { type: 'error'; error: unknown }
|
||||||
|
@ -43,7 +44,7 @@ export type WorkerOutboundMessage =
|
||||||
| {
|
| {
|
||||||
type: 'result'
|
type: 'result'
|
||||||
id: number
|
id: number
|
||||||
result?: unknown
|
result?: SerializedResult<unknown>
|
||||||
error?: SerializedError
|
error?: SerializedError
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,3 +58,68 @@ export type WorkerMessageHandler = (message: WorkerInboundMessage, respond: Resp
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
export type WorkerCustomMethods = Record<string, (...args: any[]) => Promise<any>>
|
export type WorkerCustomMethods = Record<string, (...args: any[]) => Promise<any>>
|
||||||
|
|
||||||
|
export type SerializedResult<T> = { __serialized__: T }
|
||||||
|
|
||||||
|
export function serializeResult<T>(result: T): SerializedResult<T> {
|
||||||
|
if (Array.isArray(result)) {
|
||||||
|
return result.map(serializeResult) as unknown as SerializedResult<T>
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result instanceof Map) {
|
||||||
|
for (const [key, value] of result.entries()) {
|
||||||
|
result.set(key, serializeResult(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
return result as unknown as SerializedResult<T>
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result && typeof result === 'object') {
|
||||||
|
// replace Long instances with a special object
|
||||||
|
for (const [key, value] of Object.entries(result)) {
|
||||||
|
if (Long.isLong(value)) {
|
||||||
|
// eslint-disable-next-line
|
||||||
|
;(result as any)[key] = {
|
||||||
|
__type: 'long',
|
||||||
|
low: value.low,
|
||||||
|
high: value.high,
|
||||||
|
unsigned: value.unsigned,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// eslint-disable-next-line
|
||||||
|
;(result as any)[key] = serializeResult(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result as unknown as SerializedResult<T>
|
||||||
|
}
|
||||||
|
|
||||||
|
export function deserializeResult<T>(result: SerializedResult<T>): T {
|
||||||
|
if (Array.isArray(result)) {
|
||||||
|
return result.map(deserializeResult) as unknown as T
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result instanceof Map) {
|
||||||
|
for (const [key, value] of result.entries()) {
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
||||||
|
result.set(key, deserializeResult(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
return result as unknown as T
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result && typeof result === 'object') {
|
||||||
|
for (const [key, value] of Object.entries(result)) {
|
||||||
|
if (value && typeof value === 'object' && (value as Record<string, string>).__type === 'long') {
|
||||||
|
// eslint-disable-next-line
|
||||||
|
;(result as any)[key] = Long.fromValue(value as unknown as Long)
|
||||||
|
} else {
|
||||||
|
// eslint-disable-next-line
|
||||||
|
;(result as any)[key] = deserializeResult(value as SerializedResult<unknown>)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result as unknown as T
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,12 @@
|
||||||
import { BaseTelegramClient } from '../base.js'
|
import { BaseTelegramClient } from '../base.js'
|
||||||
import { serializeError } from './errors.js'
|
import { serializeError } from './errors.js'
|
||||||
import { RespondFn, WorkerCustomMethods, WorkerInboundMessage, WorkerMessageHandler } from './protocol.js'
|
import {
|
||||||
|
RespondFn,
|
||||||
|
serializeResult,
|
||||||
|
WorkerCustomMethods,
|
||||||
|
WorkerInboundMessage,
|
||||||
|
WorkerMessageHandler,
|
||||||
|
} from './protocol.js'
|
||||||
|
|
||||||
export interface TelegramWorkerOptions<T extends WorkerCustomMethods> {
|
export interface TelegramWorkerOptions<T extends WorkerCustomMethods> {
|
||||||
client: BaseTelegramClient
|
client: BaseTelegramClient
|
||||||
|
@ -62,9 +68,9 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
|
||||||
client.onUpdate((update, peers) =>
|
client.onUpdate((update, peers) =>
|
||||||
this.broadcast({
|
this.broadcast({
|
||||||
type: 'update',
|
type: 'update',
|
||||||
update,
|
update: serializeResult(update),
|
||||||
users: peers.users,
|
users: serializeResult(peers.users),
|
||||||
chats: peers.chats,
|
chats: serializeResult(peers.chats),
|
||||||
hasMin: peers.hasMin,
|
hasMin: peers.hasMin,
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
@ -72,7 +78,7 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
|
||||||
client.onServerUpdate((update) =>
|
client.onServerUpdate((update) =>
|
||||||
this.broadcast({
|
this.broadcast({
|
||||||
type: 'server_update',
|
type: 'server_update',
|
||||||
update,
|
update: serializeResult(update),
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -153,7 +159,7 @@ export abstract class TelegramWorker<T extends WorkerCustomMethods> {
|
||||||
respond({
|
respond({
|
||||||
type: 'result',
|
type: 'result',
|
||||||
id: msg.id,
|
id: msg.id,
|
||||||
result: res,
|
result: serializeResult(res),
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
|
|
Loading…
Reference in a new issue