feat(client)!: batched queries

breaking: getUsers may now return null
This commit is contained in:
alina 🌸 2023-11-23 19:11:00 +03:00
parent 238ef7c826
commit 8249bd2ea4
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
8 changed files with 998 additions and 52 deletions

View file

@ -4224,7 +4224,7 @@ export interface TelegramClient extends BaseTelegramClient {
/**
* Boost a given channel
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @param peerId Peer ID to boost
*/
@ -4232,7 +4232,7 @@ export interface TelegramClient extends BaseTelegramClient {
/**
* Check if the current user can apply boost to a given channel
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @param peerId Peer ID whose stories to fetch
* @returns
@ -4248,7 +4248,7 @@ export interface TelegramClient extends BaseTelegramClient {
/**
* Check if the current user can post stories as a given peer
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @param peerId Peer ID whose stories to fetch
* @returns
@ -4260,7 +4260,7 @@ export interface TelegramClient extends BaseTelegramClient {
/**
* Delete a story
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @returns IDs of stories that were removed
*/
@ -4280,7 +4280,7 @@ export interface TelegramClient extends BaseTelegramClient {
/**
* Edit a sent story
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @returns Edited story
*/
@ -4321,7 +4321,7 @@ export interface TelegramClient extends BaseTelegramClient {
}): Promise<Story>
/**
* Get all stories (e.g. to load the top bar)
* **Available**: both users and bots
* **Available**: 👤 users only
*
*/
getAllStories(params?: {
@ -4370,14 +4370,14 @@ export interface TelegramClient extends BaseTelegramClient {
/**
* Get stories of a given peer
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @param peerId Peer ID whose stories to fetch
*/
getPeerStories(peerId: InputPeerLike): Promise<PeerStories>
/**
* Get profile stories
* **Available**: both users and bots
* **Available**: 👤 users only
*
*/
getProfileStories(
@ -4418,7 +4418,7 @@ export interface TelegramClient extends BaseTelegramClient {
* Get brief information about stories interactions.
*
* The result will be in the same order as the input IDs
* **Available**: both users and bots
* **Available**: 👤 users only
*
*/
getStoriesInteractions(peerId: InputPeerLike, storyIds: MaybeArray<number>): Promise<StoryInteractions[]>
@ -4429,13 +4429,13 @@ export interface TelegramClient extends BaseTelegramClient {
* and if the user doesn't have a username, `USER_PUBLIC_MISSING` is thrown.
*
* I have no idea why is this an RPC call, but whatever
* **Available**: both users and bots
* **Available**: 👤 users only
*
*/
getStoryLink(peerId: InputPeerLike, storyId: number): Promise<string>
/**
* Get viewers list of a story
* **Available**: both users and bots
* **Available**: 👤 users only
*
*/
getStoryViewers(
@ -4478,7 +4478,7 @@ export interface TelegramClient extends BaseTelegramClient {
* Hide own stories views (activate so called "stealth mode")
*
* Currently has a cooldown of 1 hour, and throws FLOOD_WAIT error if it is on cooldown.
* **Available**: both users and bots
* **Available**: 👤 users only
*
*/
hideMyStoriesViews(params?: {
@ -4502,7 +4502,7 @@ export interface TelegramClient extends BaseTelegramClient {
* This should be used for pinned stories, as they can't
* be marked as read when the user sees them ({@link Story#isActive} == false)
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @param peerId Peer ID whose stories to mark as read
* @param ids ID(s) of the stories to increment views of (max 200)
@ -4608,7 +4608,7 @@ export interface TelegramClient extends BaseTelegramClient {
*
* This should only be used for "active" stories ({@link Story#isActive} == false)
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @param peerId Peer ID whose stories to mark as read
* @returns IDs of the stores that were marked as read
@ -4616,7 +4616,7 @@ export interface TelegramClient extends BaseTelegramClient {
readStories(peerId: InputPeerLike, maxId: number): Promise<number[]>
/**
* Report a story (or multiple stories) to the moderation team
* **Available**: both users and bots
* **Available**: 👤 users only
*
*/
reportStory(
@ -4638,7 +4638,7 @@ export interface TelegramClient extends BaseTelegramClient {
): Promise<void>
/**
* Send (or remove) a reaction to a story
* **Available**: both users and bots
* **Available**: 👤 users only
*
*/
sendStoryReaction(params: {
@ -4653,7 +4653,7 @@ export interface TelegramClient extends BaseTelegramClient {
/**
* Send a story
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @returns Created story
*/
@ -4711,14 +4711,14 @@ export interface TelegramClient extends BaseTelegramClient {
* Toggle whether peer's stories are archived (hidden) or not.
*
* This **does not** archive the chat with that peer, only stories.
* **Available**: both users and bots
* **Available**: 👤 users only
*
*/
togglePeerStoriesArchived(peerId: InputPeerLike, archived: boolean): Promise<void>
/**
* Toggle one or more stories pinned status
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @returns IDs of stories that were toggled
*/
@ -4829,7 +4829,7 @@ export interface TelegramClient extends BaseTelegramClient {
/**
* Edit "close friends" list directly using user IDs
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @param ids User IDs
*/
@ -4837,7 +4837,7 @@ export interface TelegramClient extends BaseTelegramClient {
/**
* Edit "close friends" list using `InputPeerLike`s
*
* **Available**: both users and bots
* **Available**: 👤 users only
*
* @param ids User IDs
*/
@ -4918,7 +4918,7 @@ export interface TelegramClient extends BaseTelegramClient {
*
* @param ids Users' identifiers. Can be ID, username, phone number, `"me"`, `"self"` or TL object
*/
getUsers(ids: MaybeArray<InputPeerLike>): Promise<User[]>
getUsers(ids: MaybeArray<InputPeerLike>): Promise<(User | null)[]>
/**
* Iterate over profile photos
*
@ -4975,7 +4975,7 @@ export interface TelegramClient extends BaseTelegramClient {
* **Available**: both users and bots
*
* @param peerId The peer identifier that you want to extract the `InputPeer` from.
* @param [force=false] Whether to force re-fetch the peer from the server
* @param [force=false] Whether to force re-fetch the peer from the server (only for usernames and phone numbers)
*/
resolvePeer(peerId: InputPeerLike, force?: boolean): Promise<tl.TypeInputPeer>
/**

View file

@ -0,0 +1,122 @@
import { MtArgumentError, tl } from '@mtcute/core'
import { batchedQuery } from '../../utils/query-batcher.js'
import { getAuthState } from '../auth/_state.js'
/** @internal */
export const _getUsersBatched = batchedQuery<tl.TypeInputUser, tl.TypeUser, number>({
fetch: (client, items) =>
client
.call({
_: 'users.getUsers',
id: items,
// there's actually not much point in filtering, since telegram currently simply omits the missing users
// but maybe it will change in the future and i don't want to think about it
})
.then((res) => res.filter((it) => it._ !== 'userEmpty')),
inputKey: (item, client) => {
switch (item._) {
case 'inputUser':
case 'inputUserFromMessage':
return item.userId
case 'inputUserSelf':
return getAuthState(client).userId!
default:
throw new MtArgumentError('Invalid input user')
}
},
outputKey: (item) => item.id,
maxBatchSize: 50,
maxConcurrent: 3,
retrySingleOnError: (items, err) => {
if (!tl.RpcError.is(err)) return false
let fromMessageCount = 0
for (const item of items) {
if (item._ === 'inputUserFromMessage') fromMessageCount++
}
if (fromMessageCount === 0) {
// no point in retrying, the error is likely not related to the inputUserFromMessage
return false
}
switch (err.text) {
case 'CHANNEL_INVALID':
case 'USER_BANNED_IN_CHANNEL':
case 'CHANNEL_PRIVATE':
case 'MSG_ID_INVALID':
return true
default:
return false
}
},
})
/** @internal */
export const _getChatsBatched = batchedQuery<number, tl.RawChat, number>({
fetch: (client, items) =>
client
.call({
_: 'messages.getChats',
id: items,
})
.then((res) => res.chats.filter((it): it is tl.RawChat => it._ === 'chat')),
inputKey: (id) => id,
outputKey: (item) => item.id,
maxBatchSize: 50,
maxConcurrent: 3,
})
/** @internal */
export const _getChannelsBatched = batchedQuery<tl.TypeInputChannel, tl.RawChannel | tl.RawChannelForbidden, number>({
fetch: (client, items) =>
client
.call({
_: 'channels.getChannels',
id: items,
})
.then((res) =>
res.chats.filter(
(it): it is tl.RawChannel | tl.RawChannelForbidden =>
it._ === 'channel' || it._ === 'channelForbidden',
),
),
inputKey: (id) => {
switch (id._) {
case 'inputChannel':
case 'inputChannelFromMessage':
return id.channelId
default:
throw new MtArgumentError('Invalid input channel')
}
},
outputKey: (item) => item.id,
maxBatchSize: 50,
maxConcurrent: 3,
retrySingleOnError: (items, err) => {
if (!tl.RpcError.is(err)) return false
let fromMessageCount = 0
for (const item of items) {
if (item._ === 'inputChannelFromMessage') fromMessageCount++
}
if (fromMessageCount === 0) {
// no point in retrying, the error is likely not related to the inputChannelFromMessage
return false
}
switch (err.text) {
case 'CHANNEL_INVALID':
case 'USER_BANNED_IN_CHANNEL':
case 'CHANNEL_PRIVATE':
case 'MSG_ID_INVALID':
return true
default:
return false
}
},
})

View file

@ -1,6 +1,6 @@
import { BaseTelegramClient, MtArgumentError, tl } from '@mtcute/core'
import { Chat, InputPeerLike } from '../../types/index.js'
import { Chat, InputPeerLike, MtPeerNotFoundError } from '../../types/index.js'
import {
INVITE_LINK_REGEX,
isInputPeerChannel,
@ -10,6 +10,7 @@ import {
normalizeToInputUser,
} from '../../utils/peer-utils.js'
import { resolvePeer } from '../users/resolve-peer.js'
import { _getChannelsBatched, _getChatsBatched, _getUsersBatched } from './batched-queries.js'
// @available=both
/**
@ -40,26 +41,16 @@ export async function getChat(client: BaseTelegramClient, chatId: InputPeerLike)
const peer = await resolvePeer(client, chatId)
let res: tl.TypeChat | tl.TypeUser
let res: tl.TypeChat | tl.TypeUser | null
if (isInputPeerChannel(peer)) {
const r = await client.call({
_: 'channels.getChannels',
id: [normalizeToInputChannel(peer)],
})
res = r.chats[0]
res = await _getChannelsBatched(client, normalizeToInputChannel(peer))
} else if (isInputPeerUser(peer)) {
const r = await client.call({
_: 'users.getUsers',
id: [normalizeToInputUser(peer)],
})
res = r[0]
res = await _getUsersBatched(client, normalizeToInputUser(peer))
} else if (isInputPeerChat(peer)) {
const r = await client.call({
_: 'messages.getChats',
id: [peer.chatId],
})
res = r.chats[0]
res = await _getChatsBatched(client, peer.chatId)
} else throw new Error('should not happen')
if (!res) throw new MtPeerNotFoundError(`Chat ${JSON.stringify(chatId)} was not found`)
return new Chat(res)
}

View file

@ -31,7 +31,7 @@ describe('getUsers', () => {
expect(await getUsers(client, 123)).toEqual([new User(createStub('user', { id: 123, accessHash: Long.ZERO }))])
})
it('should ignore userEmpty', async () => {
expect(await getUsers(client, 1)).toEqual([])
it('should return null for userEmpty', async () => {
expect(await getUsers(client, 1)).toEqual([null])
})
})

View file

@ -2,6 +2,8 @@ import { BaseTelegramClient, MaybeArray } from '@mtcute/core'
import { InputPeerLike, User } from '../../types/index.js'
import { normalizeToInputUser } from '../../utils/peer-utils.js'
import { _getUsersBatched } from '../chats/batched-queries.js'
import { resolvePeer } from './resolve-peer.js'
import { resolvePeerMany } from './resolve-peer-many.js'
/**
@ -12,16 +14,18 @@ import { resolvePeerMany } from './resolve-peer-many.js'
*
* @param ids Users' identifiers. Can be ID, username, phone number, `"me"`, `"self"` or TL object
*/
export async function getUsers(client: BaseTelegramClient, ids: MaybeArray<InputPeerLike>): Promise<User[]> {
const isArray = Array.isArray(ids)
if (!isArray) ids = [ids as InputPeerLike]
export async function getUsers(client: BaseTelegramClient, ids: MaybeArray<InputPeerLike>): Promise<(User | null)[]> {
if (!Array.isArray(ids)) {
// avoid unnecessary overhead of Promise.all and resolvePeerMany
const res = await _getUsersBatched(client, normalizeToInputUser(await resolvePeer(client, ids)))
const inputPeers = await resolvePeerMany(client, ids as InputPeerLike[], normalizeToInputUser)
return [res ? new User(res) : null]
}
const res = await client.call({
_: 'users.getUsers',
id: inputPeers,
})
const inputPeers = await resolvePeerMany(client, ids, normalizeToInputUser)
return res.filter((it) => it._ !== 'userEmpty').map((it) => new User(it))
// pooling will be done by the helper
const res = await Promise.all(inputPeers.map((peer) => _getUsersBatched(client, peer)))
return res.map((it) => (it ? new User(it) : null))
}

View file

@ -18,7 +18,7 @@ import { normalizeToInputPeer } from '../../utils/peer-utils.js'
* Useful when an `InputPeer` is needed in Raw API.
*
* @param peerId The peer identifier that you want to extract the `InputPeer` from.
* @param force Whether to force re-fetch the peer from the server (only applicable for usernames and phone numbers)
* @param force Whether to force re-fetch the peer from the server (only for usernames and phone numbers)
*/
export async function resolvePeer(
client: BaseTelegramClient,

View file

@ -0,0 +1,607 @@
import { describe, expect, it } from 'vitest'
import { BaseTelegramClient } from '@mtcute/core'
import { sleep } from '@mtcute/core/utils.js'
import { StubTelegramClient } from '@mtcute/test'
import { batchedQuery } from './query-batcher.js'
describe('batchedQuery', () => {
const client = new StubTelegramClient()
client.log.prefix = '[1]'
it('should correctly batch requests', async () => {
const log: string[] = []
const fetch = async (client: BaseTelegramClient, items: number[]) => {
log.push(`[start] fetch() ${items.join(', ')}`)
await sleep(10)
log.push(`[end] fetch() ${items.join(', ')}`)
return items.map((it) => it * 2)
}
const batched = batchedQuery({
fetch,
inputKey: (it) => it,
outputKey: (it) => it / 2,
})
const batchedWrapped = async (item: number) => {
log.push(`[start] batched() ${item}`)
const res = await batched(client, item)
log.push(`[end] batched() ${item} => ${res}`)
return res
}
const results = await Promise.all([
batchedWrapped(1),
batchedWrapped(2),
batchedWrapped(3),
batchedWrapped(3),
batchedWrapped(4),
])
const results2 = await Promise.all([batchedWrapped(4), batchedWrapped(5), batchedWrapped(6)])
expect(results).toEqual([2, 4, 6, 6, 8])
expect(results2).toEqual([8, 10, 12])
expect(log).toMatchInlineSnapshot(`
[
"[start] batched() 1",
"[start] fetch() 1",
"[start] batched() 2",
"[start] batched() 3",
"[start] batched() 3",
"[start] batched() 4",
"[end] fetch() 1",
"[end] batched() 1 => 2",
"[start] fetch() 2, 3, 4",
"[end] fetch() 2, 3, 4",
"[end] batched() 2 => 4",
"[end] batched() 3 => 6",
"[end] batched() 3 => 6",
"[end] batched() 4 => 8",
"[start] batched() 4",
"[start] fetch() 4",
"[start] batched() 5",
"[start] batched() 6",
"[end] fetch() 4",
"[end] batched() 4 => 8",
"[start] fetch() 5, 6",
"[end] fetch() 5, 6",
"[end] batched() 5 => 10",
"[end] batched() 6 => 12",
]
`)
})
it('should correctly limit batch size', async () => {
const log: string[] = []
const fetch = async (client: BaseTelegramClient, items: number[]) => {
log.push(`[start] fetch() ${items.join(', ')}`)
await sleep(10)
log.push(`[end] fetch() ${items.join(', ')}`)
return items.map((it) => it * 2)
}
const batched = batchedQuery({
fetch,
inputKey: (it) => it,
outputKey: (it) => it / 2,
maxBatchSize: 2,
})
const batchedWrapped = async (item: number) => {
log.push(`[start] batched() ${item}`)
const res = await batched(client, item)
log.push(`[end] batched() ${item} => ${res}`)
return res
}
const results = await Promise.all([
batchedWrapped(1),
batchedWrapped(2),
batchedWrapped(3),
batchedWrapped(3),
batchedWrapped(4),
batchedWrapped(5),
batchedWrapped(6),
])
const results2 = await Promise.all([batchedWrapped(6), batchedWrapped(7), batchedWrapped(8)])
expect(results).toEqual([2, 4, 6, 6, 8, 10, 12])
expect(results2).toEqual([12, 14, 16])
expect(log).toMatchInlineSnapshot(`
[
"[start] batched() 1",
"[start] fetch() 1",
"[start] batched() 2",
"[start] batched() 3",
"[start] batched() 3",
"[start] batched() 4",
"[start] batched() 5",
"[start] batched() 6",
"[end] fetch() 1",
"[end] batched() 1 => 2",
"[start] fetch() 2, 3",
"[end] fetch() 2, 3",
"[end] batched() 2 => 4",
"[end] batched() 3 => 6",
"[end] batched() 3 => 6",
"[start] fetch() 4, 5",
"[end] fetch() 4, 5",
"[end] batched() 4 => 8",
"[end] batched() 5 => 10",
"[start] fetch() 6",
"[end] fetch() 6",
"[end] batched() 6 => 12",
"[start] batched() 6",
"[start] fetch() 6",
"[start] batched() 7",
"[start] batched() 8",
"[end] fetch() 6",
"[end] batched() 6 => 12",
"[start] fetch() 7, 8",
"[end] fetch() 7, 8",
"[end] batched() 7 => 14",
"[end] batched() 8 => 16",
]
`)
})
it('should correctly do concurrent requests', async () => {
const log: string[] = []
const fetch = async (client: BaseTelegramClient, items: number[]) => {
log.push(`[start] fetch() ${items.join(', ')}`)
await sleep(10)
log.push(`[end] fetch() ${items.join(', ')}`)
return items.map((it) => it * 2)
}
const batched = batchedQuery({
fetch,
inputKey: (it) => it,
outputKey: (it) => it / 2,
maxBatchSize: 2,
maxConcurrent: 2,
})
const batchedWrapped = async (item: number) => {
log.push(`[start] batched() ${item}`)
const res = await batched(client, item)
log.push(`[end] batched() ${item} => ${res}`)
return res
}
const results = await Promise.all([
batchedWrapped(1),
batchedWrapped(2),
batchedWrapped(3),
batchedWrapped(3),
batchedWrapped(4),
batchedWrapped(5),
batchedWrapped(6),
])
const results2 = await Promise.all([batchedWrapped(6), batchedWrapped(7), batchedWrapped(8)])
expect(results).toEqual([2, 4, 6, 6, 8, 10, 12])
expect(results2).toEqual([12, 14, 16])
expect(log).toMatchInlineSnapshot(`
[
"[start] batched() 1",
"[start] fetch() 1",
"[start] batched() 2",
"[start] fetch() 2",
"[start] batched() 3",
"[start] batched() 3",
"[start] batched() 4",
"[start] batched() 5",
"[start] batched() 6",
"[end] fetch() 1",
"[end] batched() 1 => 2",
"[start] fetch() 3, 4",
"[end] fetch() 2",
"[end] batched() 2 => 4",
"[start] fetch() 5, 6",
"[end] fetch() 3, 4",
"[end] batched() 3 => 6",
"[end] batched() 3 => 6",
"[end] batched() 4 => 8",
"[end] fetch() 5, 6",
"[end] batched() 5 => 10",
"[end] batched() 6 => 12",
"[start] batched() 6",
"[start] fetch() 6",
"[start] batched() 7",
"[start] fetch() 7",
"[start] batched() 8",
"[end] fetch() 6",
"[end] batched() 6 => 12",
"[start] fetch() 8",
"[end] fetch() 7",
"[end] batched() 7 => 14",
"[end] fetch() 8",
"[end] batched() 8 => 16",
]
`)
})
it('should correctly handle errors', async () => {
const log: string[] = []
const fetch = async (client: BaseTelegramClient, items: number[]) => {
log.push(`[start] fetch() ${items.join(', ')}`)
await sleep(10)
if (items.includes(2)) {
log.push(`[error] fetch() ${items.join(', ')}`)
throw new Error('test')
}
log.push(`[end] fetch() ${items.join(', ')}`)
return items.map((it) => it * 2)
}
const batched = batchedQuery({
fetch,
inputKey: (it) => it,
outputKey: (it) => it / 2,
maxBatchSize: 2,
})
const batchedWrapped = async (item: number) => {
log.push(`[start] batched() ${item}`)
let res
try {
res = await batched(client, item)
} catch (e) {
log.push(`[error] batched() ${item} => ${(e as Error).message}`)
return null
}
log.push(`[end] batched() ${item} => ${res}`)
return res
}
const res = await Promise.all([batchedWrapped(1), batchedWrapped(2), batchedWrapped(3)])
// second batch will fail entirely because of an error in one of the items.
expect(res).toEqual([2, null, null])
expect(log).toMatchInlineSnapshot(`
[
"[start] batched() 1",
"[start] fetch() 1",
"[start] batched() 2",
"[start] batched() 3",
"[end] fetch() 1",
"[end] batched() 1 => 2",
"[start] fetch() 2, 3",
"[error] fetch() 2, 3",
"[error] batched() 2 => test",
"[error] batched() 3 => test",
]
`)
})
it('should not share state across clients', async () => {
const client2 = new StubTelegramClient()
client2.log.prefix = '[2]'
const log: string[] = []
const fetch = async (client: BaseTelegramClient, items: number[]) => {
log.push(`[start] ${client.log.prefix} fetch() ${items.join(', ')}`)
await sleep(10)
log.push(`[end] ${client.log.prefix} fetch() ${items.join(', ')}`)
return items.map((it) => it * 2)
}
const batched = batchedQuery({
fetch,
inputKey: (it) => it,
outputKey: (it) => it / 2,
})
const batchedWrapped = async (item: number, client_ = client) => {
log.push(`[start] ${client_.log.prefix} batched() ${item}`)
const res = await batched(client_, item)
log.push(`[end] ${client_.log.prefix} batched() ${item} => ${res}`)
return res
}
const results = await Promise.all([
batchedWrapped(1),
batchedWrapped(2),
batchedWrapped(3),
batchedWrapped(3, client2),
batchedWrapped(4, client2),
batchedWrapped(5),
batchedWrapped(6, client2),
])
const results2 = await Promise.all([batchedWrapped(6, client2), batchedWrapped(7), batchedWrapped(8, client2)])
expect(results).toEqual([2, 4, 6, 6, 8, 10, 12])
expect(results2).toEqual([12, 14, 16])
expect(log).toMatchInlineSnapshot(`
[
"[start] [1] batched() 1",
"[start] [1] fetch() 1",
"[start] [1] batched() 2",
"[start] [1] batched() 3",
"[start] [2] batched() 3",
"[start] [2] fetch() 3",
"[start] [2] batched() 4",
"[start] [1] batched() 5",
"[start] [2] batched() 6",
"[end] [1] fetch() 1",
"[end] [1] batched() 1 => 2",
"[start] [1] fetch() 2, 3, 5",
"[end] [2] fetch() 3",
"[end] [2] batched() 3 => 6",
"[start] [2] fetch() 4, 6",
"[end] [1] fetch() 2, 3, 5",
"[end] [1] batched() 2 => 4",
"[end] [1] batched() 3 => 6",
"[end] [1] batched() 5 => 10",
"[end] [2] fetch() 4, 6",
"[end] [2] batched() 4 => 8",
"[end] [2] batched() 6 => 12",
"[start] [2] batched() 6",
"[start] [2] fetch() 6",
"[start] [1] batched() 7",
"[start] [1] fetch() 7",
"[start] [2] batched() 8",
"[end] [2] fetch() 6",
"[end] [2] batched() 6 => 12",
"[start] [2] fetch() 8",
"[end] [1] fetch() 7",
"[end] [1] batched() 7 => 14",
"[end] [2] fetch() 8",
"[end] [2] batched() 8 => 16",
]
`)
})
it('should correctly handle fetcher omitting some items', async () => {
const log: string[] = []
const fetch = async (client: BaseTelegramClient, items: number[]) => {
log.push(`[start] fetch() ${items.join(', ')}`)
await sleep(10)
log.push(`[end] fetch() ${items.join(', ')}`)
return items.filter((it) => it !== 6).map((it) => it * 2)
}
const batched = batchedQuery({
fetch,
inputKey: (it) => it,
outputKey: (it) => it / 2,
})
const batchedWrapped = async (item: number) => {
log.push(`[start] batched() ${item}`)
const res = await batched(client, item)
log.push(`[end] batched() ${item} => ${res}`)
return res
}
const results = await Promise.all([
batchedWrapped(1),
batchedWrapped(2),
batchedWrapped(3),
batchedWrapped(3),
batchedWrapped(4),
batchedWrapped(5),
batchedWrapped(6),
])
const results2 = await Promise.all([batchedWrapped(6), batchedWrapped(7), batchedWrapped(8)])
expect(results).toEqual([2, 4, 6, 6, 8, 10, null])
expect(results2).toEqual([null, 14, 16])
expect(log).toMatchInlineSnapshot(`
[
"[start] batched() 1",
"[start] fetch() 1",
"[start] batched() 2",
"[start] batched() 3",
"[start] batched() 3",
"[start] batched() 4",
"[start] batched() 5",
"[start] batched() 6",
"[end] fetch() 1",
"[end] batched() 1 => 2",
"[start] fetch() 2, 3, 4, 5, 6",
"[end] fetch() 2, 3, 4, 5, 6",
"[end] batched() 2 => 4",
"[end] batched() 3 => 6",
"[end] batched() 3 => 6",
"[end] batched() 4 => 8",
"[end] batched() 5 => 10",
"[end] batched() 6 => null",
"[start] batched() 6",
"[start] fetch() 6",
"[start] batched() 7",
"[start] batched() 8",
"[end] fetch() 6",
"[end] batched() 6 => null",
"[start] fetch() 7, 8",
"[end] fetch() 7, 8",
"[end] batched() 7 => 14",
"[end] batched() 8 => 16",
]
`)
})
it('should correctly retry failed batches one by one entirely', async () => {
const log: string[] = []
const fetch = async (client: BaseTelegramClient, items: number[]) => {
log.push(`[start] fetch() ${items.join(', ')}`)
await sleep(10)
if (items.includes(2)) {
log.push(`[error] fetch() ${items.join(', ')}`)
throw new Error('test')
}
log.push(`[end] fetch() ${items.join(', ')}`)
return items.map((it) => it * 2)
}
const batched = batchedQuery({
fetch,
inputKey: (it) => it,
outputKey: (it) => it / 2,
maxBatchSize: 2,
retrySingleOnError: () => true,
})
const batchedWrapped = async (item: number) => {
log.push(`[start] batched() ${item}`)
let res
try {
res = await batched(client, item)
} catch (e) {
log.push(`[error] batched() ${item} => ${(e as Error).message}`)
return null
}
log.push(`[end] batched() ${item} => ${res}`)
return res
}
const res = await Promise.all([batchedWrapped(1), batchedWrapped(2), batchedWrapped(3)])
expect(res).toEqual([2, null, 6])
expect(log).toMatchInlineSnapshot(`
[
"[start] batched() 1",
"[start] fetch() 1",
"[start] batched() 2",
"[start] batched() 3",
"[end] fetch() 1",
"[end] batched() 1 => 2",
"[start] fetch() 2, 3",
"[error] fetch() 2, 3",
"[start] fetch() 2",
"[error] fetch() 2",
"[error] batched() 2 => test",
"[start] fetch() 3",
"[end] fetch() 3",
"[end] batched() 3 => 6",
]
`)
})
it('should correctly retry failed batches one by one partially', async () => {
const log: string[] = []
const fetch = async (client: BaseTelegramClient, items: number[]) => {
log.push(`[start] fetch() ${items.join(', ')}`)
await sleep(10)
if (items.includes(2) || items.includes(4)) {
log.push(`[error] fetch() ${items.join(', ')}`)
throw new Error('test')
}
log.push(`[end] fetch() ${items.join(', ')}`)
return items.map((it) => it * 2)
}
const batched = batchedQuery({
fetch,
inputKey: (it) => it,
outputKey: (it) => it / 2,
retrySingleOnError: () => [3, 4],
})
const batchedWrapped = async (item: number) => {
log.push(`[start] batched() ${item}`)
let res
try {
res = await batched(client, item)
} catch (e) {
log.push(`[error] batched() ${item} => ${(e as Error).message}`)
return null
}
log.push(`[end] batched() ${item} => ${res}`)
return res
}
const res = await Promise.all([batchedWrapped(1), batchedWrapped(2), batchedWrapped(3), batchedWrapped(4)])
expect(res).toEqual([2, null, 6, null])
expect(log).toMatchInlineSnapshot(`
[
"[start] batched() 1",
"[start] fetch() 1",
"[start] batched() 2",
"[start] batched() 3",
"[start] batched() 4",
"[end] fetch() 1",
"[end] batched() 1 => 2",
"[start] fetch() 2, 3, 4",
"[error] fetch() 2, 3, 4",
"[error] batched() 2 => test",
"[start] fetch() 3",
"[end] fetch() 3",
"[end] batched() 3 => 6",
"[start] fetch() 4",
"[error] fetch() 4",
"[error] batched() 4 => test",
]
`)
})
})

View file

@ -0,0 +1,222 @@
import { BaseTelegramClient } from '@mtcute/core'
import { Deque } from '@mtcute/core/utils.js'
type Resolve<T> = (value: T | PromiseLike<T>) => void
type Reject = (err?: unknown) => void
type WaitersMap<K, U, T> = Map<K, [T, Resolve<U | null>, Reject][]>
interface InternalState<K, U, T> {
waiters: WaitersMap<K, U, T>
fetchingKeys: Set<K>
retryQueue: Deque<T>
numRunning: number
}
/**
* Helper function for building batched queries.
*
* Concepts:
* - "input" - items being passed to the query function
* - "output" - items returned by the query function
* - "key" - unique identifier of the item, which should be deriveable from both input and output.
* used for matching input and output items and deduplicating them.
*/
export function batchedQuery<T, U, K extends string | number>(params: {
/**
* Fetcher function, taking an array of input items and returning an array of output items.
*
* If some item is not found, it should be omitted from the result array,
* this way the corresponding request will be resolved with `null`.
*/
fetch: (client: BaseTelegramClient, items: T[]) => Promise<U[]>
/** Key derivation function for input items */
inputKey: (item: T, client: BaseTelegramClient) => K
/** Key derivation function for output items */
outputKey: (item: U, client: BaseTelegramClient) => K
/**
* Maximum number of items to be passed to the `fetcher` function at once.
*
* It is recommended to pass ~half of the maximum allowed by the server,
* since in some cases failing on a single item will cause the whole batch to fail.
*
* @default Infinity
*/
maxBatchSize?: number
/**
* Maximum number of concurrently running queries.
*
* @default 1
*/
maxConcurrent?: number
/**
* In case of an error, we can try retrying the query for some items one-by-one,
* to avoid failing the whole batch.
*
* @param items Items contained in the batch that failed
* @param err Error that was thrown by the fetcher
* @returns `true` if the query should be retried for all items, `false` if it should be retried for none,
* or an array of items for which the query should be retried (waiters for other items will throw `err`).
*/
retrySingleOnError?: (items: T[], err: unknown) => boolean | T[]
}): (client: BaseTelegramClient, item: T) => Promise<U | null> {
const { inputKey, outputKey, fetch, maxBatchSize = Infinity, maxConcurrent = 1, retrySingleOnError } = params
const symbol = Symbol('batchedQueryState')
function getState(client_: BaseTelegramClient) {
const client = client_ as { [symbol]?: InternalState<K, U, T> }
if (!client[symbol]) {
client[symbol] = {
waiters: new Map(),
fetchingKeys: new Set(),
retryQueue: new Deque(),
numRunning: 0,
}
}
return client[symbol]
}
function addWaiter(client: BaseTelegramClient, waiters: WaitersMap<K, U, T>, item: T) {
const key = inputKey(item, client)
let arr = waiters.get(key)
if (!arr) {
arr = []
waiters.set(key, arr)
}
return new Promise<U | null>((resolve, reject) => {
arr!.push([item, resolve, reject])
})
}
function popWaiters(waiters: WaitersMap<K, U, T>, key: K) {
const arr = waiters.get(key)
if (!arr) return []
waiters.delete(key)
return arr
}
function startLoops(client: BaseTelegramClient, state: InternalState<K, U, T>) {
for (let i = state.numRunning; i <= maxConcurrent; i++) {
processPending(client, state)
}
}
function processPending(client: BaseTelegramClient, state: InternalState<K, U, T>) {
const { waiters, fetchingKeys, retryQueue } = state
if (state.numRunning >= maxConcurrent) return
const request: T[] = []
const requestKeys: K[] = []
let isRetryRequest = false
if (retryQueue.length > 0) {
// handle retries in the same loop so we can easily use the same concurrency pool
isRetryRequest = true
const it = retryQueue.popFront()!
request.push(it)
const key = inputKey(it, client)
requestKeys.push(key)
fetchingKeys.add(key)
} else {
for (const it of waiters.keys()) {
if (fetchingKeys.has(it)) continue
request.push(waiters.get(it)![0][0])
requestKeys.push(it)
fetchingKeys.add(it)
if (request.length === maxBatchSize) break
}
if (request.length === 0) return
}
state.numRunning += 1
// eslint-disable-next-line @typescript-eslint/no-floating-promises
fetch(client, request)
.then((res) => {
const receivedKeys = new Set<K>()
for (const it of res) {
const key = outputKey(it, client)
receivedKeys.add(key)
for (const waiter of popWaiters(waiters, key)) {
waiter[1](it)
}
fetchingKeys.delete(key)
}
for (const key of requestKeys) {
if (!receivedKeys.has(key)) {
for (const waiter of popWaiters(waiters, key)) {
waiter[1](null)
}
fetchingKeys.delete(key)
}
}
})
.catch((err: unknown) => {
if (retrySingleOnError && !isRetryRequest) {
const retry = retrySingleOnError(request, err)
if (retry === true) {
for (const it of request) {
retryQueue.pushBack(it)
}
return
} else if (Array.isArray(retry)) {
for (const req of retry) {
const requestKeysIdx = request.indexOf(req)
if (requestKeysIdx < 0) continue
retryQueue.pushBack(req)
// to avoid rejecting it below
request.splice(requestKeysIdx, 1)
requestKeys.splice(requestKeysIdx, 1)
}
}
}
for (const key of requestKeys) {
for (const waiter of popWaiters(waiters, key)) {
waiter[2](err)
}
fetchingKeys.delete(key)
}
})
.then(() => {
state.numRunning -= 1
if (waiters.size > 0) processPending(client, state)
})
}
return function (client, item) {
const state = getState(client)
const promise = addWaiter(client, state.waiters, item)
startLoops(client, state)
return promise
}
}