From 8249bd2ea46315095f22fe616754eef8d61b39e7 Mon Sep 17 00:00:00 2001 From: Alina Sireneva Date: Thu, 23 Nov 2023 19:11:00 +0300 Subject: [PATCH] feat(client)!: batched queries breaking: getUsers may now return null --- packages/client/src/client.ts | 46 +- .../src/methods/chats/batched-queries.ts | 122 ++++ packages/client/src/methods/chats/get-chat.ts | 25 +- .../src/methods/users/get-users.test.ts | 4 +- .../client/src/methods/users/get-users.ts | 22 +- .../client/src/methods/users/resolve-peer.ts | 2 +- .../client/src/utils/query-batcher.test.ts | 607 ++++++++++++++++++ packages/client/src/utils/query-batcher.ts | 222 +++++++ 8 files changed, 998 insertions(+), 52 deletions(-) create mode 100644 packages/client/src/methods/chats/batched-queries.ts create mode 100644 packages/client/src/utils/query-batcher.test.ts create mode 100644 packages/client/src/utils/query-batcher.ts diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index 86a2b657..0a1d58c0 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -4224,7 +4224,7 @@ export interface TelegramClient extends BaseTelegramClient { /** * Boost a given channel * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @param peerId Peer ID to boost */ @@ -4232,7 +4232,7 @@ export interface TelegramClient extends BaseTelegramClient { /** * Check if the current user can apply boost to a given channel * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @param peerId Peer ID whose stories to fetch * @returns @@ -4248,7 +4248,7 @@ export interface TelegramClient extends BaseTelegramClient { /** * Check if the current user can post stories as a given peer * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @param peerId Peer ID whose stories to fetch * @returns @@ -4260,7 +4260,7 @@ export interface TelegramClient extends BaseTelegramClient { /** * Delete a story * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @returns IDs of stories that were removed */ @@ -4280,7 +4280,7 @@ export interface TelegramClient extends BaseTelegramClient { /** * Edit a sent story * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @returns Edited story */ @@ -4321,7 +4321,7 @@ export interface TelegramClient extends BaseTelegramClient { }): Promise /** * Get all stories (e.g. to load the top bar) - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * */ getAllStories(params?: { @@ -4370,14 +4370,14 @@ export interface TelegramClient extends BaseTelegramClient { /** * Get stories of a given peer * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @param peerId Peer ID whose stories to fetch */ getPeerStories(peerId: InputPeerLike): Promise /** * Get profile stories - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * */ getProfileStories( @@ -4418,7 +4418,7 @@ export interface TelegramClient extends BaseTelegramClient { * Get brief information about stories interactions. * * The result will be in the same order as the input IDs - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * */ getStoriesInteractions(peerId: InputPeerLike, storyIds: MaybeArray): Promise @@ -4429,13 +4429,13 @@ export interface TelegramClient extends BaseTelegramClient { * and if the user doesn't have a username, `USER_PUBLIC_MISSING` is thrown. * * I have no idea why is this an RPC call, but whatever - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * */ getStoryLink(peerId: InputPeerLike, storyId: number): Promise /** * Get viewers list of a story - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * */ getStoryViewers( @@ -4478,7 +4478,7 @@ export interface TelegramClient extends BaseTelegramClient { * Hide own stories views (activate so called "stealth mode") * * Currently has a cooldown of 1 hour, and throws FLOOD_WAIT error if it is on cooldown. - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * */ hideMyStoriesViews(params?: { @@ -4502,7 +4502,7 @@ export interface TelegramClient extends BaseTelegramClient { * This should be used for pinned stories, as they can't * be marked as read when the user sees them ({@link Story#isActive} == false) * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @param peerId Peer ID whose stories to mark as read * @param ids ID(s) of the stories to increment views of (max 200) @@ -4608,7 +4608,7 @@ export interface TelegramClient extends BaseTelegramClient { * * This should only be used for "active" stories ({@link Story#isActive} == false) * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @param peerId Peer ID whose stories to mark as read * @returns IDs of the stores that were marked as read @@ -4616,7 +4616,7 @@ export interface TelegramClient extends BaseTelegramClient { readStories(peerId: InputPeerLike, maxId: number): Promise /** * Report a story (or multiple stories) to the moderation team - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * */ reportStory( @@ -4638,7 +4638,7 @@ export interface TelegramClient extends BaseTelegramClient { ): Promise /** * Send (or remove) a reaction to a story - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * */ sendStoryReaction(params: { @@ -4653,7 +4653,7 @@ export interface TelegramClient extends BaseTelegramClient { /** * Send a story * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @returns Created story */ @@ -4711,14 +4711,14 @@ export interface TelegramClient extends BaseTelegramClient { * Toggle whether peer's stories are archived (hidden) or not. * * This **does not** archive the chat with that peer, only stories. - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * */ togglePeerStoriesArchived(peerId: InputPeerLike, archived: boolean): Promise /** * Toggle one or more stories pinned status * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @returns IDs of stories that were toggled */ @@ -4829,7 +4829,7 @@ export interface TelegramClient extends BaseTelegramClient { /** * Edit "close friends" list directly using user IDs * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @param ids User IDs */ @@ -4837,7 +4837,7 @@ export interface TelegramClient extends BaseTelegramClient { /** * Edit "close friends" list using `InputPeerLike`s * - * **Available**: ✅ both users and bots + * **Available**: 👤 users only * * @param ids User IDs */ @@ -4918,7 +4918,7 @@ export interface TelegramClient extends BaseTelegramClient { * * @param ids Users' identifiers. Can be ID, username, phone number, `"me"`, `"self"` or TL object */ - getUsers(ids: MaybeArray): Promise + getUsers(ids: MaybeArray): Promise<(User | null)[]> /** * Iterate over profile photos * @@ -4975,7 +4975,7 @@ export interface TelegramClient extends BaseTelegramClient { * **Available**: ✅ both users and bots * * @param peerId The peer identifier that you want to extract the `InputPeer` from. - * @param [force=false] Whether to force re-fetch the peer from the server + * @param [force=false] Whether to force re-fetch the peer from the server (only for usernames and phone numbers) */ resolvePeer(peerId: InputPeerLike, force?: boolean): Promise /** diff --git a/packages/client/src/methods/chats/batched-queries.ts b/packages/client/src/methods/chats/batched-queries.ts new file mode 100644 index 00000000..90acdadf --- /dev/null +++ b/packages/client/src/methods/chats/batched-queries.ts @@ -0,0 +1,122 @@ +import { MtArgumentError, tl } from '@mtcute/core' + +import { batchedQuery } from '../../utils/query-batcher.js' +import { getAuthState } from '../auth/_state.js' + +/** @internal */ +export const _getUsersBatched = batchedQuery({ + fetch: (client, items) => + client + .call({ + _: 'users.getUsers', + id: items, + // there's actually not much point in filtering, since telegram currently simply omits the missing users + // but maybe it will change in the future and i don't want to think about it + }) + .then((res) => res.filter((it) => it._ !== 'userEmpty')), + inputKey: (item, client) => { + switch (item._) { + case 'inputUser': + case 'inputUserFromMessage': + return item.userId + case 'inputUserSelf': + return getAuthState(client).userId! + default: + throw new MtArgumentError('Invalid input user') + } + }, + outputKey: (item) => item.id, + maxBatchSize: 50, + maxConcurrent: 3, + retrySingleOnError: (items, err) => { + if (!tl.RpcError.is(err)) return false + + let fromMessageCount = 0 + + for (const item of items) { + if (item._ === 'inputUserFromMessage') fromMessageCount++ + } + + if (fromMessageCount === 0) { + // no point in retrying, the error is likely not related to the inputUserFromMessage + return false + } + + switch (err.text) { + case 'CHANNEL_INVALID': + case 'USER_BANNED_IN_CHANNEL': + case 'CHANNEL_PRIVATE': + case 'MSG_ID_INVALID': + return true + default: + return false + } + }, +}) + +/** @internal */ +export const _getChatsBatched = batchedQuery({ + fetch: (client, items) => + client + .call({ + _: 'messages.getChats', + id: items, + }) + .then((res) => res.chats.filter((it): it is tl.RawChat => it._ === 'chat')), + inputKey: (id) => id, + outputKey: (item) => item.id, + maxBatchSize: 50, + maxConcurrent: 3, +}) + +/** @internal */ +export const _getChannelsBatched = batchedQuery({ + fetch: (client, items) => + client + .call({ + _: 'channels.getChannels', + id: items, + }) + .then((res) => + res.chats.filter( + (it): it is tl.RawChannel | tl.RawChannelForbidden => + it._ === 'channel' || it._ === 'channelForbidden', + ), + ), + inputKey: (id) => { + switch (id._) { + case 'inputChannel': + case 'inputChannelFromMessage': + return id.channelId + default: + throw new MtArgumentError('Invalid input channel') + } + }, + outputKey: (item) => item.id, + maxBatchSize: 50, + maxConcurrent: 3, + retrySingleOnError: (items, err) => { + if (!tl.RpcError.is(err)) return false + + let fromMessageCount = 0 + + for (const item of items) { + if (item._ === 'inputChannelFromMessage') fromMessageCount++ + } + + if (fromMessageCount === 0) { + // no point in retrying, the error is likely not related to the inputChannelFromMessage + return false + } + + switch (err.text) { + case 'CHANNEL_INVALID': + case 'USER_BANNED_IN_CHANNEL': + case 'CHANNEL_PRIVATE': + case 'MSG_ID_INVALID': + return true + default: + return false + } + }, +}) diff --git a/packages/client/src/methods/chats/get-chat.ts b/packages/client/src/methods/chats/get-chat.ts index 67c1aba1..ca7eeb11 100644 --- a/packages/client/src/methods/chats/get-chat.ts +++ b/packages/client/src/methods/chats/get-chat.ts @@ -1,6 +1,6 @@ import { BaseTelegramClient, MtArgumentError, tl } from '@mtcute/core' -import { Chat, InputPeerLike } from '../../types/index.js' +import { Chat, InputPeerLike, MtPeerNotFoundError } from '../../types/index.js' import { INVITE_LINK_REGEX, isInputPeerChannel, @@ -10,6 +10,7 @@ import { normalizeToInputUser, } from '../../utils/peer-utils.js' import { resolvePeer } from '../users/resolve-peer.js' +import { _getChannelsBatched, _getChatsBatched, _getUsersBatched } from './batched-queries.js' // @available=both /** @@ -40,26 +41,16 @@ export async function getChat(client: BaseTelegramClient, chatId: InputPeerLike) const peer = await resolvePeer(client, chatId) - let res: tl.TypeChat | tl.TypeUser + let res: tl.TypeChat | tl.TypeUser | null if (isInputPeerChannel(peer)) { - const r = await client.call({ - _: 'channels.getChannels', - id: [normalizeToInputChannel(peer)], - }) - res = r.chats[0] + res = await _getChannelsBatched(client, normalizeToInputChannel(peer)) } else if (isInputPeerUser(peer)) { - const r = await client.call({ - _: 'users.getUsers', - id: [normalizeToInputUser(peer)], - }) - res = r[0] + res = await _getUsersBatched(client, normalizeToInputUser(peer)) } else if (isInputPeerChat(peer)) { - const r = await client.call({ - _: 'messages.getChats', - id: [peer.chatId], - }) - res = r.chats[0] + res = await _getChatsBatched(client, peer.chatId) } else throw new Error('should not happen') + if (!res) throw new MtPeerNotFoundError(`Chat ${JSON.stringify(chatId)} was not found`) + return new Chat(res) } diff --git a/packages/client/src/methods/users/get-users.test.ts b/packages/client/src/methods/users/get-users.test.ts index 14fc5063..3c02fa6b 100644 --- a/packages/client/src/methods/users/get-users.test.ts +++ b/packages/client/src/methods/users/get-users.test.ts @@ -31,7 +31,7 @@ describe('getUsers', () => { expect(await getUsers(client, 123)).toEqual([new User(createStub('user', { id: 123, accessHash: Long.ZERO }))]) }) - it('should ignore userEmpty', async () => { - expect(await getUsers(client, 1)).toEqual([]) + it('should return null for userEmpty', async () => { + expect(await getUsers(client, 1)).toEqual([null]) }) }) diff --git a/packages/client/src/methods/users/get-users.ts b/packages/client/src/methods/users/get-users.ts index 5e08d969..c4b749e4 100644 --- a/packages/client/src/methods/users/get-users.ts +++ b/packages/client/src/methods/users/get-users.ts @@ -2,6 +2,8 @@ import { BaseTelegramClient, MaybeArray } from '@mtcute/core' import { InputPeerLike, User } from '../../types/index.js' import { normalizeToInputUser } from '../../utils/peer-utils.js' +import { _getUsersBatched } from '../chats/batched-queries.js' +import { resolvePeer } from './resolve-peer.js' import { resolvePeerMany } from './resolve-peer-many.js' /** @@ -12,16 +14,18 @@ import { resolvePeerMany } from './resolve-peer-many.js' * * @param ids Users' identifiers. Can be ID, username, phone number, `"me"`, `"self"` or TL object */ -export async function getUsers(client: BaseTelegramClient, ids: MaybeArray): Promise { - const isArray = Array.isArray(ids) - if (!isArray) ids = [ids as InputPeerLike] +export async function getUsers(client: BaseTelegramClient, ids: MaybeArray): Promise<(User | null)[]> { + if (!Array.isArray(ids)) { + // avoid unnecessary overhead of Promise.all and resolvePeerMany + const res = await _getUsersBatched(client, normalizeToInputUser(await resolvePeer(client, ids))) - const inputPeers = await resolvePeerMany(client, ids as InputPeerLike[], normalizeToInputUser) + return [res ? new User(res) : null] + } - const res = await client.call({ - _: 'users.getUsers', - id: inputPeers, - }) + const inputPeers = await resolvePeerMany(client, ids, normalizeToInputUser) - return res.filter((it) => it._ !== 'userEmpty').map((it) => new User(it)) + // pooling will be done by the helper + const res = await Promise.all(inputPeers.map((peer) => _getUsersBatched(client, peer))) + + return res.map((it) => (it ? new User(it) : null)) } diff --git a/packages/client/src/methods/users/resolve-peer.ts b/packages/client/src/methods/users/resolve-peer.ts index f79c007c..7311d712 100644 --- a/packages/client/src/methods/users/resolve-peer.ts +++ b/packages/client/src/methods/users/resolve-peer.ts @@ -18,7 +18,7 @@ import { normalizeToInputPeer } from '../../utils/peer-utils.js' * Useful when an `InputPeer` is needed in Raw API. * * @param peerId The peer identifier that you want to extract the `InputPeer` from. - * @param force Whether to force re-fetch the peer from the server (only applicable for usernames and phone numbers) + * @param force Whether to force re-fetch the peer from the server (only for usernames and phone numbers) */ export async function resolvePeer( client: BaseTelegramClient, diff --git a/packages/client/src/utils/query-batcher.test.ts b/packages/client/src/utils/query-batcher.test.ts new file mode 100644 index 00000000..f92f1450 --- /dev/null +++ b/packages/client/src/utils/query-batcher.test.ts @@ -0,0 +1,607 @@ +import { describe, expect, it } from 'vitest' + +import { BaseTelegramClient } from '@mtcute/core' +import { sleep } from '@mtcute/core/utils.js' +import { StubTelegramClient } from '@mtcute/test' + +import { batchedQuery } from './query-batcher.js' + +describe('batchedQuery', () => { + const client = new StubTelegramClient() + client.log.prefix = '[1]' + + it('should correctly batch requests', async () => { + const log: string[] = [] + + const fetch = async (client: BaseTelegramClient, items: number[]) => { + log.push(`[start] fetch() ${items.join(', ')}`) + + await sleep(10) + + log.push(`[end] fetch() ${items.join(', ')}`) + + return items.map((it) => it * 2) + } + + const batched = batchedQuery({ + fetch, + inputKey: (it) => it, + outputKey: (it) => it / 2, + }) + + const batchedWrapped = async (item: number) => { + log.push(`[start] batched() ${item}`) + + const res = await batched(client, item) + + log.push(`[end] batched() ${item} => ${res}`) + + return res + } + + const results = await Promise.all([ + batchedWrapped(1), + batchedWrapped(2), + batchedWrapped(3), + batchedWrapped(3), + batchedWrapped(4), + ]) + const results2 = await Promise.all([batchedWrapped(4), batchedWrapped(5), batchedWrapped(6)]) + + expect(results).toEqual([2, 4, 6, 6, 8]) + expect(results2).toEqual([8, 10, 12]) + expect(log).toMatchInlineSnapshot(` + [ + "[start] batched() 1", + "[start] fetch() 1", + "[start] batched() 2", + "[start] batched() 3", + "[start] batched() 3", + "[start] batched() 4", + "[end] fetch() 1", + "[end] batched() 1 => 2", + "[start] fetch() 2, 3, 4", + "[end] fetch() 2, 3, 4", + "[end] batched() 2 => 4", + "[end] batched() 3 => 6", + "[end] batched() 3 => 6", + "[end] batched() 4 => 8", + "[start] batched() 4", + "[start] fetch() 4", + "[start] batched() 5", + "[start] batched() 6", + "[end] fetch() 4", + "[end] batched() 4 => 8", + "[start] fetch() 5, 6", + "[end] fetch() 5, 6", + "[end] batched() 5 => 10", + "[end] batched() 6 => 12", + ] + `) + }) + + it('should correctly limit batch size', async () => { + const log: string[] = [] + + const fetch = async (client: BaseTelegramClient, items: number[]) => { + log.push(`[start] fetch() ${items.join(', ')}`) + + await sleep(10) + + log.push(`[end] fetch() ${items.join(', ')}`) + + return items.map((it) => it * 2) + } + + const batched = batchedQuery({ + fetch, + inputKey: (it) => it, + outputKey: (it) => it / 2, + maxBatchSize: 2, + }) + + const batchedWrapped = async (item: number) => { + log.push(`[start] batched() ${item}`) + + const res = await batched(client, item) + + log.push(`[end] batched() ${item} => ${res}`) + + return res + } + + const results = await Promise.all([ + batchedWrapped(1), + batchedWrapped(2), + batchedWrapped(3), + batchedWrapped(3), + batchedWrapped(4), + batchedWrapped(5), + batchedWrapped(6), + ]) + const results2 = await Promise.all([batchedWrapped(6), batchedWrapped(7), batchedWrapped(8)]) + + expect(results).toEqual([2, 4, 6, 6, 8, 10, 12]) + expect(results2).toEqual([12, 14, 16]) + expect(log).toMatchInlineSnapshot(` + [ + "[start] batched() 1", + "[start] fetch() 1", + "[start] batched() 2", + "[start] batched() 3", + "[start] batched() 3", + "[start] batched() 4", + "[start] batched() 5", + "[start] batched() 6", + "[end] fetch() 1", + "[end] batched() 1 => 2", + "[start] fetch() 2, 3", + "[end] fetch() 2, 3", + "[end] batched() 2 => 4", + "[end] batched() 3 => 6", + "[end] batched() 3 => 6", + "[start] fetch() 4, 5", + "[end] fetch() 4, 5", + "[end] batched() 4 => 8", + "[end] batched() 5 => 10", + "[start] fetch() 6", + "[end] fetch() 6", + "[end] batched() 6 => 12", + "[start] batched() 6", + "[start] fetch() 6", + "[start] batched() 7", + "[start] batched() 8", + "[end] fetch() 6", + "[end] batched() 6 => 12", + "[start] fetch() 7, 8", + "[end] fetch() 7, 8", + "[end] batched() 7 => 14", + "[end] batched() 8 => 16", + ] + `) + }) + + it('should correctly do concurrent requests', async () => { + const log: string[] = [] + + const fetch = async (client: BaseTelegramClient, items: number[]) => { + log.push(`[start] fetch() ${items.join(', ')}`) + + await sleep(10) + + log.push(`[end] fetch() ${items.join(', ')}`) + + return items.map((it) => it * 2) + } + + const batched = batchedQuery({ + fetch, + inputKey: (it) => it, + outputKey: (it) => it / 2, + maxBatchSize: 2, + maxConcurrent: 2, + }) + + const batchedWrapped = async (item: number) => { + log.push(`[start] batched() ${item}`) + + const res = await batched(client, item) + + log.push(`[end] batched() ${item} => ${res}`) + + return res + } + + const results = await Promise.all([ + batchedWrapped(1), + batchedWrapped(2), + batchedWrapped(3), + batchedWrapped(3), + batchedWrapped(4), + batchedWrapped(5), + batchedWrapped(6), + ]) + const results2 = await Promise.all([batchedWrapped(6), batchedWrapped(7), batchedWrapped(8)]) + + expect(results).toEqual([2, 4, 6, 6, 8, 10, 12]) + expect(results2).toEqual([12, 14, 16]) + expect(log).toMatchInlineSnapshot(` + [ + "[start] batched() 1", + "[start] fetch() 1", + "[start] batched() 2", + "[start] fetch() 2", + "[start] batched() 3", + "[start] batched() 3", + "[start] batched() 4", + "[start] batched() 5", + "[start] batched() 6", + "[end] fetch() 1", + "[end] batched() 1 => 2", + "[start] fetch() 3, 4", + "[end] fetch() 2", + "[end] batched() 2 => 4", + "[start] fetch() 5, 6", + "[end] fetch() 3, 4", + "[end] batched() 3 => 6", + "[end] batched() 3 => 6", + "[end] batched() 4 => 8", + "[end] fetch() 5, 6", + "[end] batched() 5 => 10", + "[end] batched() 6 => 12", + "[start] batched() 6", + "[start] fetch() 6", + "[start] batched() 7", + "[start] fetch() 7", + "[start] batched() 8", + "[end] fetch() 6", + "[end] batched() 6 => 12", + "[start] fetch() 8", + "[end] fetch() 7", + "[end] batched() 7 => 14", + "[end] fetch() 8", + "[end] batched() 8 => 16", + ] + `) + }) + + it('should correctly handle errors', async () => { + const log: string[] = [] + + const fetch = async (client: BaseTelegramClient, items: number[]) => { + log.push(`[start] fetch() ${items.join(', ')}`) + + await sleep(10) + + if (items.includes(2)) { + log.push(`[error] fetch() ${items.join(', ')}`) + throw new Error('test') + } + + log.push(`[end] fetch() ${items.join(', ')}`) + + return items.map((it) => it * 2) + } + + const batched = batchedQuery({ + fetch, + inputKey: (it) => it, + outputKey: (it) => it / 2, + maxBatchSize: 2, + }) + + const batchedWrapped = async (item: number) => { + log.push(`[start] batched() ${item}`) + + let res + + try { + res = await batched(client, item) + } catch (e) { + log.push(`[error] batched() ${item} => ${(e as Error).message}`) + + return null + } + + log.push(`[end] batched() ${item} => ${res}`) + + return res + } + + const res = await Promise.all([batchedWrapped(1), batchedWrapped(2), batchedWrapped(3)]) + + // second batch will fail entirely because of an error in one of the items. + expect(res).toEqual([2, null, null]) + expect(log).toMatchInlineSnapshot(` + [ + "[start] batched() 1", + "[start] fetch() 1", + "[start] batched() 2", + "[start] batched() 3", + "[end] fetch() 1", + "[end] batched() 1 => 2", + "[start] fetch() 2, 3", + "[error] fetch() 2, 3", + "[error] batched() 2 => test", + "[error] batched() 3 => test", + ] + `) + }) + + it('should not share state across clients', async () => { + const client2 = new StubTelegramClient() + client2.log.prefix = '[2]' + + const log: string[] = [] + + const fetch = async (client: BaseTelegramClient, items: number[]) => { + log.push(`[start] ${client.log.prefix} fetch() ${items.join(', ')}`) + + await sleep(10) + + log.push(`[end] ${client.log.prefix} fetch() ${items.join(', ')}`) + + return items.map((it) => it * 2) + } + + const batched = batchedQuery({ + fetch, + inputKey: (it) => it, + outputKey: (it) => it / 2, + }) + + const batchedWrapped = async (item: number, client_ = client) => { + log.push(`[start] ${client_.log.prefix} batched() ${item}`) + + const res = await batched(client_, item) + + log.push(`[end] ${client_.log.prefix} batched() ${item} => ${res}`) + + return res + } + + const results = await Promise.all([ + batchedWrapped(1), + batchedWrapped(2), + batchedWrapped(3), + batchedWrapped(3, client2), + batchedWrapped(4, client2), + batchedWrapped(5), + batchedWrapped(6, client2), + ]) + const results2 = await Promise.all([batchedWrapped(6, client2), batchedWrapped(7), batchedWrapped(8, client2)]) + + expect(results).toEqual([2, 4, 6, 6, 8, 10, 12]) + expect(results2).toEqual([12, 14, 16]) + + expect(log).toMatchInlineSnapshot(` + [ + "[start] [1] batched() 1", + "[start] [1] fetch() 1", + "[start] [1] batched() 2", + "[start] [1] batched() 3", + "[start] [2] batched() 3", + "[start] [2] fetch() 3", + "[start] [2] batched() 4", + "[start] [1] batched() 5", + "[start] [2] batched() 6", + "[end] [1] fetch() 1", + "[end] [1] batched() 1 => 2", + "[start] [1] fetch() 2, 3, 5", + "[end] [2] fetch() 3", + "[end] [2] batched() 3 => 6", + "[start] [2] fetch() 4, 6", + "[end] [1] fetch() 2, 3, 5", + "[end] [1] batched() 2 => 4", + "[end] [1] batched() 3 => 6", + "[end] [1] batched() 5 => 10", + "[end] [2] fetch() 4, 6", + "[end] [2] batched() 4 => 8", + "[end] [2] batched() 6 => 12", + "[start] [2] batched() 6", + "[start] [2] fetch() 6", + "[start] [1] batched() 7", + "[start] [1] fetch() 7", + "[start] [2] batched() 8", + "[end] [2] fetch() 6", + "[end] [2] batched() 6 => 12", + "[start] [2] fetch() 8", + "[end] [1] fetch() 7", + "[end] [1] batched() 7 => 14", + "[end] [2] fetch() 8", + "[end] [2] batched() 8 => 16", + ] + `) + }) + + it('should correctly handle fetcher omitting some items', async () => { + const log: string[] = [] + + const fetch = async (client: BaseTelegramClient, items: number[]) => { + log.push(`[start] fetch() ${items.join(', ')}`) + + await sleep(10) + + log.push(`[end] fetch() ${items.join(', ')}`) + + return items.filter((it) => it !== 6).map((it) => it * 2) + } + + const batched = batchedQuery({ + fetch, + inputKey: (it) => it, + outputKey: (it) => it / 2, + }) + + const batchedWrapped = async (item: number) => { + log.push(`[start] batched() ${item}`) + + const res = await batched(client, item) + + log.push(`[end] batched() ${item} => ${res}`) + + return res + } + + const results = await Promise.all([ + batchedWrapped(1), + batchedWrapped(2), + batchedWrapped(3), + batchedWrapped(3), + batchedWrapped(4), + batchedWrapped(5), + batchedWrapped(6), + ]) + const results2 = await Promise.all([batchedWrapped(6), batchedWrapped(7), batchedWrapped(8)]) + + expect(results).toEqual([2, 4, 6, 6, 8, 10, null]) + expect(results2).toEqual([null, 14, 16]) + expect(log).toMatchInlineSnapshot(` + [ + "[start] batched() 1", + "[start] fetch() 1", + "[start] batched() 2", + "[start] batched() 3", + "[start] batched() 3", + "[start] batched() 4", + "[start] batched() 5", + "[start] batched() 6", + "[end] fetch() 1", + "[end] batched() 1 => 2", + "[start] fetch() 2, 3, 4, 5, 6", + "[end] fetch() 2, 3, 4, 5, 6", + "[end] batched() 2 => 4", + "[end] batched() 3 => 6", + "[end] batched() 3 => 6", + "[end] batched() 4 => 8", + "[end] batched() 5 => 10", + "[end] batched() 6 => null", + "[start] batched() 6", + "[start] fetch() 6", + "[start] batched() 7", + "[start] batched() 8", + "[end] fetch() 6", + "[end] batched() 6 => null", + "[start] fetch() 7, 8", + "[end] fetch() 7, 8", + "[end] batched() 7 => 14", + "[end] batched() 8 => 16", + ] + `) + }) + + it('should correctly retry failed batches one by one entirely', async () => { + const log: string[] = [] + + const fetch = async (client: BaseTelegramClient, items: number[]) => { + log.push(`[start] fetch() ${items.join(', ')}`) + + await sleep(10) + + if (items.includes(2)) { + log.push(`[error] fetch() ${items.join(', ')}`) + throw new Error('test') + } + + log.push(`[end] fetch() ${items.join(', ')}`) + + return items.map((it) => it * 2) + } + + const batched = batchedQuery({ + fetch, + inputKey: (it) => it, + outputKey: (it) => it / 2, + maxBatchSize: 2, + retrySingleOnError: () => true, + }) + + const batchedWrapped = async (item: number) => { + log.push(`[start] batched() ${item}`) + + let res + + try { + res = await batched(client, item) + } catch (e) { + log.push(`[error] batched() ${item} => ${(e as Error).message}`) + + return null + } + + log.push(`[end] batched() ${item} => ${res}`) + + return res + } + + const res = await Promise.all([batchedWrapped(1), batchedWrapped(2), batchedWrapped(3)]) + + expect(res).toEqual([2, null, 6]) + expect(log).toMatchInlineSnapshot(` + [ + "[start] batched() 1", + "[start] fetch() 1", + "[start] batched() 2", + "[start] batched() 3", + "[end] fetch() 1", + "[end] batched() 1 => 2", + "[start] fetch() 2, 3", + "[error] fetch() 2, 3", + "[start] fetch() 2", + "[error] fetch() 2", + "[error] batched() 2 => test", + "[start] fetch() 3", + "[end] fetch() 3", + "[end] batched() 3 => 6", + ] + `) + }) + + it('should correctly retry failed batches one by one partially', async () => { + const log: string[] = [] + + const fetch = async (client: BaseTelegramClient, items: number[]) => { + log.push(`[start] fetch() ${items.join(', ')}`) + + await sleep(10) + + if (items.includes(2) || items.includes(4)) { + log.push(`[error] fetch() ${items.join(', ')}`) + throw new Error('test') + } + + log.push(`[end] fetch() ${items.join(', ')}`) + + return items.map((it) => it * 2) + } + + const batched = batchedQuery({ + fetch, + inputKey: (it) => it, + outputKey: (it) => it / 2, + retrySingleOnError: () => [3, 4], + }) + + const batchedWrapped = async (item: number) => { + log.push(`[start] batched() ${item}`) + + let res + + try { + res = await batched(client, item) + } catch (e) { + log.push(`[error] batched() ${item} => ${(e as Error).message}`) + + return null + } + + log.push(`[end] batched() ${item} => ${res}`) + + return res + } + + const res = await Promise.all([batchedWrapped(1), batchedWrapped(2), batchedWrapped(3), batchedWrapped(4)]) + + expect(res).toEqual([2, null, 6, null]) + expect(log).toMatchInlineSnapshot(` + [ + "[start] batched() 1", + "[start] fetch() 1", + "[start] batched() 2", + "[start] batched() 3", + "[start] batched() 4", + "[end] fetch() 1", + "[end] batched() 1 => 2", + "[start] fetch() 2, 3, 4", + "[error] fetch() 2, 3, 4", + "[error] batched() 2 => test", + "[start] fetch() 3", + "[end] fetch() 3", + "[end] batched() 3 => 6", + "[start] fetch() 4", + "[error] fetch() 4", + "[error] batched() 4 => test", + ] + `) + }) +}) diff --git a/packages/client/src/utils/query-batcher.ts b/packages/client/src/utils/query-batcher.ts new file mode 100644 index 00000000..dc39ed45 --- /dev/null +++ b/packages/client/src/utils/query-batcher.ts @@ -0,0 +1,222 @@ +import { BaseTelegramClient } from '@mtcute/core' +import { Deque } from '@mtcute/core/utils.js' + +type Resolve = (value: T | PromiseLike) => void +type Reject = (err?: unknown) => void + +type WaitersMap = Map, Reject][]> +interface InternalState { + waiters: WaitersMap + fetchingKeys: Set + retryQueue: Deque + numRunning: number +} + +/** + * Helper function for building batched queries. + * + * Concepts: + * - "input" - items being passed to the query function + * - "output" - items returned by the query function + * - "key" - unique identifier of the item, which should be deriveable from both input and output. + * used for matching input and output items and deduplicating them. + */ +export function batchedQuery(params: { + /** + * Fetcher function, taking an array of input items and returning an array of output items. + * + * If some item is not found, it should be omitted from the result array, + * this way the corresponding request will be resolved with `null`. + */ + fetch: (client: BaseTelegramClient, items: T[]) => Promise + + /** Key derivation function for input items */ + inputKey: (item: T, client: BaseTelegramClient) => K + /** Key derivation function for output items */ + outputKey: (item: U, client: BaseTelegramClient) => K + + /** + * Maximum number of items to be passed to the `fetcher` function at once. + * + * It is recommended to pass ~half of the maximum allowed by the server, + * since in some cases failing on a single item will cause the whole batch to fail. + * + * @default Infinity + */ + maxBatchSize?: number + + /** + * Maximum number of concurrently running queries. + * + * @default 1 + */ + maxConcurrent?: number + + /** + * In case of an error, we can try retrying the query for some items one-by-one, + * to avoid failing the whole batch. + * + * @param items Items contained in the batch that failed + * @param err Error that was thrown by the fetcher + * @returns `true` if the query should be retried for all items, `false` if it should be retried for none, + * or an array of items for which the query should be retried (waiters for other items will throw `err`). + */ + retrySingleOnError?: (items: T[], err: unknown) => boolean | T[] +}): (client: BaseTelegramClient, item: T) => Promise { + const { inputKey, outputKey, fetch, maxBatchSize = Infinity, maxConcurrent = 1, retrySingleOnError } = params + + const symbol = Symbol('batchedQueryState') + + function getState(client_: BaseTelegramClient) { + const client = client_ as { [symbol]?: InternalState } + + if (!client[symbol]) { + client[symbol] = { + waiters: new Map(), + fetchingKeys: new Set(), + retryQueue: new Deque(), + numRunning: 0, + } + } + + return client[symbol] + } + + function addWaiter(client: BaseTelegramClient, waiters: WaitersMap, item: T) { + const key = inputKey(item, client) + + let arr = waiters.get(key) + + if (!arr) { + arr = [] + waiters.set(key, arr) + } + + return new Promise((resolve, reject) => { + arr!.push([item, resolve, reject]) + }) + } + + function popWaiters(waiters: WaitersMap, key: K) { + const arr = waiters.get(key) + if (!arr) return [] + + waiters.delete(key) + + return arr + } + + function startLoops(client: BaseTelegramClient, state: InternalState) { + for (let i = state.numRunning; i <= maxConcurrent; i++) { + processPending(client, state) + } + } + + function processPending(client: BaseTelegramClient, state: InternalState) { + const { waiters, fetchingKeys, retryQueue } = state + + if (state.numRunning >= maxConcurrent) return + + const request: T[] = [] + const requestKeys: K[] = [] + let isRetryRequest = false + + if (retryQueue.length > 0) { + // handle retries in the same loop so we can easily use the same concurrency pool + isRetryRequest = true + + const it = retryQueue.popFront()! + request.push(it) + + const key = inputKey(it, client) + requestKeys.push(key) + fetchingKeys.add(key) + } else { + for (const it of waiters.keys()) { + if (fetchingKeys.has(it)) continue + + request.push(waiters.get(it)![0][0]) + requestKeys.push(it) + fetchingKeys.add(it) + + if (request.length === maxBatchSize) break + } + + if (request.length === 0) return + } + + state.numRunning += 1 + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + fetch(client, request) + .then((res) => { + const receivedKeys = new Set() + + for (const it of res) { + const key = outputKey(it, client) + receivedKeys.add(key) + + for (const waiter of popWaiters(waiters, key)) { + waiter[1](it) + } + + fetchingKeys.delete(key) + } + + for (const key of requestKeys) { + if (!receivedKeys.has(key)) { + for (const waiter of popWaiters(waiters, key)) { + waiter[1](null) + } + + fetchingKeys.delete(key) + } + } + }) + .catch((err: unknown) => { + if (retrySingleOnError && !isRetryRequest) { + const retry = retrySingleOnError(request, err) + + if (retry === true) { + for (const it of request) { + retryQueue.pushBack(it) + } + + return + } else if (Array.isArray(retry)) { + for (const req of retry) { + const requestKeysIdx = request.indexOf(req) + if (requestKeysIdx < 0) continue + + retryQueue.pushBack(req) + // to avoid rejecting it below + request.splice(requestKeysIdx, 1) + requestKeys.splice(requestKeysIdx, 1) + } + } + } + + for (const key of requestKeys) { + for (const waiter of popWaiters(waiters, key)) { + waiter[2](err) + } + + fetchingKeys.delete(key) + } + }) + .then(() => { + state.numRunning -= 1 + + if (waiters.size > 0) processPending(client, state) + }) + } + + return function (client, item) { + const state = getState(client) + const promise = addWaiter(client, state.waiters, item) + + startLoops(client, state) + + return promise + } +}