feat!: support min updates
breaking: changed `ITelegramStorage` interface, changed tl schema a bit
This commit is contained in:
parent
48411323af
commit
b25f9dddfa
26 changed files with 804 additions and 183 deletions
|
@ -1,11 +1,17 @@
|
|||
/* eslint-disable max-depth,max-params */
|
||||
import { assertNever, BaseTelegramClient, MtArgumentError, tl } from '@mtcute/core'
|
||||
import { assertNever, BaseTelegramClient, MaybeAsync, MtArgumentError, tl } from '@mtcute/core'
|
||||
import { getBarePeerId, getMarkedPeerId, markedPeerIdToBare, toggleChannelIdMark } from '@mtcute/core/utils.js'
|
||||
|
||||
import { PeersIndex } from '../../types/index.js'
|
||||
import { normalizeToInputChannel } from '../../utils/peer-utils.js'
|
||||
import {
|
||||
isInputPeerChannel,
|
||||
isInputPeerUser,
|
||||
normalizeToInputChannel,
|
||||
normalizeToInputUser,
|
||||
} from '../../utils/peer-utils.js'
|
||||
import { RpsMeter } from '../../utils/rps-meter.js'
|
||||
import { getAuthState } from '../auth/_state.js'
|
||||
import { _getChannelsBatched, _getUsersBatched } from '../chats/batched-queries.js'
|
||||
import { resolvePeer } from '../users/resolve-peer.js'
|
||||
import { createUpdatesState, PendingUpdate, toPendingUpdate, UpdatesManagerParams, UpdatesState } from './types.js'
|
||||
import { extractChannelIdFromUpdate, messageToUpdate } from './utils.js'
|
||||
|
@ -410,89 +416,58 @@ function addToNoDispatchIndex(state: UpdatesState, updates?: tl.TypeUpdates): vo
|
|||
}
|
||||
}
|
||||
|
||||
async function replaceMinPeers(client: BaseTelegramClient, peers: PeersIndex): Promise<boolean> {
|
||||
for (const [key, user_] of peers.users) {
|
||||
const user = user_ as Exclude<tl.TypeUser, tl.RawUserEmpty>
|
||||
|
||||
if (user.min) {
|
||||
const cached = await client.storage.getFullPeerById(user.id)
|
||||
if (!cached) return false
|
||||
peers.users.set(key, cached as tl.TypeUser)
|
||||
}
|
||||
}
|
||||
|
||||
for (const [key, chat_] of peers.chats) {
|
||||
const chat = chat_ as Extract<tl.TypeChat, { min?: boolean }>
|
||||
|
||||
if (chat.min) {
|
||||
let id: number
|
||||
|
||||
switch (chat._) {
|
||||
case 'channel':
|
||||
id = toggleChannelIdMark(chat.id)
|
||||
break
|
||||
default:
|
||||
id = -chat.id
|
||||
}
|
||||
|
||||
const cached = await client.storage.getFullPeerById(id)
|
||||
if (!cached) return false
|
||||
peers.chats.set(key, cached as tl.TypeChat)
|
||||
}
|
||||
}
|
||||
|
||||
peers.hasMin = false
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
async function fetchPeersForShort(
|
||||
async function fetchMissingPeers(
|
||||
client: BaseTelegramClient,
|
||||
upd: tl.TypeUpdate | tl.RawMessage | tl.RawMessageService,
|
||||
): Promise<PeersIndex | null> {
|
||||
const peers = new PeersIndex()
|
||||
upd: tl.TypeUpdate,
|
||||
peers: PeersIndex,
|
||||
allowMissing = false,
|
||||
): Promise<Set<number>> {
|
||||
const missing = new Set<number>()
|
||||
|
||||
const fetchPeer = async (peer?: tl.TypePeer | number) => {
|
||||
async function fetchPeer(peer?: tl.TypePeer | number) {
|
||||
if (!peer) return true
|
||||
|
||||
const bare = typeof peer === 'number' ? markedPeerIdToBare(peer) : getBarePeerId(peer)
|
||||
|
||||
const marked = typeof peer === 'number' ? peer : getMarkedPeerId(peer)
|
||||
const index = marked > 0 ? peers.chats : peers.users
|
||||
|
||||
if (index.has(bare)) return true
|
||||
if (missing.has(marked)) return false
|
||||
|
||||
const cached = await client.storage.getFullPeerById(marked)
|
||||
if (!cached) return false
|
||||
|
||||
if (marked > 0) {
|
||||
peers.users.set(bare, cached as tl.TypeUser)
|
||||
} else {
|
||||
peers.chats.set(bare, cached as tl.TypeChat)
|
||||
if (!cached) {
|
||||
missing.add(marked)
|
||||
|
||||
return allowMissing
|
||||
}
|
||||
|
||||
// whatever, ts is not smart enough to understand
|
||||
(index as Map<number, tl.TypeUser | tl.TypeChat>).set(bare, cached)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
switch (upd._) {
|
||||
// not really sure if these can be inside updateShort, but whatever
|
||||
case 'message':
|
||||
case 'messageService':
|
||||
case 'updateNewMessage':
|
||||
case 'updateNewChannelMessage':
|
||||
case 'updateEditMessage':
|
||||
case 'updateEditChannelMessage': {
|
||||
const msg = upd._ === 'message' || upd._ === 'messageService' ? upd : upd.message
|
||||
if (msg._ === 'messageEmpty') return null
|
||||
const msg = upd.message
|
||||
if (msg._ === 'messageEmpty') return missing
|
||||
|
||||
// ref: https://github.com/tdlib/td/blob/master/td/telegram/UpdatesManager.cpp
|
||||
// (search by UpdatesManager::is_acceptable_update)
|
||||
if (!(await fetchPeer(msg.peerId))) return null
|
||||
if (!(await fetchPeer(msg.fromId))) return null
|
||||
if (!(await fetchPeer(msg.peerId))) return missing
|
||||
if (!(await fetchPeer(msg.fromId))) return missing
|
||||
|
||||
if (msg.replyTo) {
|
||||
if (msg.replyTo._ === 'messageReplyHeader' && !(await fetchPeer(msg.replyTo.replyToPeerId))) {
|
||||
return null
|
||||
return missing
|
||||
}
|
||||
if (msg.replyTo._ === 'messageReplyStoryHeader' && !(await fetchPeer(msg.replyTo.userId))) {
|
||||
return null
|
||||
return missing
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -501,14 +476,14 @@ async function fetchPeersForShort(
|
|||
msg.fwdFrom &&
|
||||
(!(await fetchPeer(msg.fwdFrom.fromId)) || !(await fetchPeer(msg.fwdFrom.savedFromPeer)))
|
||||
) {
|
||||
return null
|
||||
return missing
|
||||
}
|
||||
if (!(await fetchPeer(msg.viaBotId))) return null
|
||||
if (!(await fetchPeer(msg.viaBotId))) return missing
|
||||
|
||||
if (msg.entities) {
|
||||
for (const ent of msg.entities) {
|
||||
if (ent._ === 'messageEntityMentionName') {
|
||||
if (!(await fetchPeer(ent.userId))) return null
|
||||
if (!(await fetchPeer(ent.userId))) return missing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -517,7 +492,7 @@ async function fetchPeersForShort(
|
|||
switch (msg.media._) {
|
||||
case 'messageMediaContact':
|
||||
if (msg.media.userId && !(await fetchPeer(msg.media.userId))) {
|
||||
return null
|
||||
return missing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -527,28 +502,28 @@ async function fetchPeersForShort(
|
|||
case 'messageActionChatAddUser':
|
||||
case 'messageActionInviteToGroupCall':
|
||||
for (const user of msg.action.users) {
|
||||
if (!(await fetchPeer(user))) return null
|
||||
if (!(await fetchPeer(user))) return missing
|
||||
}
|
||||
break
|
||||
case 'messageActionChatJoinedByLink':
|
||||
if (!(await fetchPeer(msg.action.inviterId))) {
|
||||
return null
|
||||
return missing
|
||||
}
|
||||
break
|
||||
case 'messageActionChatDeleteUser':
|
||||
if (!(await fetchPeer(msg.action.userId))) return null
|
||||
if (!(await fetchPeer(msg.action.userId))) return missing
|
||||
break
|
||||
case 'messageActionChatMigrateTo':
|
||||
if (!(await fetchPeer(toggleChannelIdMark(msg.action.channelId)))) {
|
||||
return null
|
||||
return missing
|
||||
}
|
||||
break
|
||||
case 'messageActionChannelMigrateFrom':
|
||||
if (!(await fetchPeer(-msg.action.chatId))) return null
|
||||
if (!(await fetchPeer(-msg.action.chatId))) return missing
|
||||
break
|
||||
case 'messageActionGeoProximityReached':
|
||||
if (!(await fetchPeer(msg.action.fromId))) return null
|
||||
if (!(await fetchPeer(msg.action.toId))) return null
|
||||
if (!(await fetchPeer(msg.action.fromId))) return missing
|
||||
if (!(await fetchPeer(msg.action.toId))) return missing
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -558,13 +533,97 @@ async function fetchPeersForShort(
|
|||
if ('entities' in upd.draft && upd.draft.entities) {
|
||||
for (const ent of upd.draft.entities) {
|
||||
if (ent._ === 'messageEntityMentionName') {
|
||||
if (!(await fetchPeer(ent.userId))) return null
|
||||
if (!(await fetchPeer(ent.userId))) return missing
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return peers
|
||||
return missing
|
||||
}
|
||||
|
||||
async function storeMessageReferences(client: BaseTelegramClient, msg: tl.TypeMessage): Promise<void> {
|
||||
if (msg._ === 'messageEmpty') return
|
||||
|
||||
const peerId = msg.peerId
|
||||
if (peerId._ !== 'peerChannel') return
|
||||
|
||||
const channelId = toggleChannelIdMark(peerId.channelId)
|
||||
|
||||
const promises: MaybeAsync<void>[] = []
|
||||
|
||||
function store(peer?: tl.TypePeer | number | number[]): void {
|
||||
if (!peer) return
|
||||
|
||||
if (Array.isArray(peer)) {
|
||||
peer.forEach(store)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
const marked = typeof peer === 'number' ? peer : getMarkedPeerId(peer)
|
||||
|
||||
promises.push(client.storage.saveReferenceMessage(marked, channelId, msg.id))
|
||||
}
|
||||
|
||||
// reference: https://github.com/tdlib/td/blob/master/td/telegram/MessagesManager.cpp
|
||||
// (search by get_message_user_ids, get_message_channel_ids)
|
||||
store(msg.fromId)
|
||||
|
||||
if (msg._ === 'message') {
|
||||
store(msg.viaBotId)
|
||||
store(msg.fwdFrom?.fromId)
|
||||
|
||||
if (msg.media) {
|
||||
switch (msg.media._) {
|
||||
case 'messageMediaWebPage':
|
||||
if (msg.media.webpage._ === 'webPage' && msg.media.webpage.attributes) {
|
||||
for (const attr of msg.media.webpage.attributes) {
|
||||
if (attr._ === 'webPageAttributeStory') {
|
||||
store(attr.peer)
|
||||
}
|
||||
}
|
||||
}
|
||||
break
|
||||
case 'messageMediaContact':
|
||||
store(msg.media.userId)
|
||||
break
|
||||
case 'messageMediaStory':
|
||||
store(msg.media.peer)
|
||||
break
|
||||
case 'messageMediaGiveaway':
|
||||
store(msg.media.channels.map(toggleChannelIdMark))
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
switch (msg.action._) {
|
||||
case 'messageActionChatCreate':
|
||||
case 'messageActionChatAddUser':
|
||||
case 'messageActionInviteToGroupCall':
|
||||
store(msg.action.users)
|
||||
break
|
||||
case 'messageActionChatDeleteUser':
|
||||
store(msg.action.userId)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (msg.replyTo) {
|
||||
switch (msg.replyTo._) {
|
||||
case 'messageReplyHeader':
|
||||
store(msg.replyTo.replyToPeerId)
|
||||
store(msg.replyTo.replyFrom?.fromId)
|
||||
break
|
||||
case 'messageReplyStoryHeader':
|
||||
store(msg.replyTo.userId)
|
||||
break
|
||||
}
|
||||
// in fact, we can also use peers contained in the replied-to message,
|
||||
// but we don't fetch it automatically, so we can't know which peers are there
|
||||
}
|
||||
|
||||
await Promise.all(promises)
|
||||
}
|
||||
|
||||
function isMessageEmpty(upd: tl.TypeUpdate): boolean {
|
||||
|
@ -615,8 +674,7 @@ async function fetchChannelDifference(
|
|||
state: UpdatesState,
|
||||
channelId: number,
|
||||
fallbackPts?: number,
|
||||
force = false,
|
||||
): Promise<void> {
|
||||
): Promise<boolean> {
|
||||
let _pts: number | null | undefined = state.cpts.get(channelId)
|
||||
|
||||
if (!_pts && state.catchUpChannels) {
|
||||
|
@ -627,17 +685,15 @@ async function fetchChannelDifference(
|
|||
if (!_pts) {
|
||||
state.log.debug('fetchChannelDifference failed for channel %d: base pts not available', channelId)
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
let channel
|
||||
const channel = normalizeToInputChannel(await resolvePeer(client, toggleChannelIdMark(channelId)))
|
||||
|
||||
try {
|
||||
channel = normalizeToInputChannel(await resolvePeer(client, toggleChannelIdMark(channelId)))
|
||||
} catch (e) {
|
||||
state.log.warn('fetchChannelDifference failed for channel %d: input peer not found', channelId)
|
||||
if (channel._ === 'inputChannel' && channel.accessHash.isZero()) {
|
||||
state.log.debug('fetchChannelDifference failed for channel %d: input peer not found', channelId)
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// to make TS happy
|
||||
|
@ -652,7 +708,7 @@ async function fetchChannelDifference(
|
|||
for (;;) {
|
||||
const diff = await client.call({
|
||||
_: 'updates.getChannelDifference',
|
||||
force,
|
||||
force: true, // Set to true to skip some possibly unneeded updates and reduce server-side load
|
||||
channel,
|
||||
pts,
|
||||
limit,
|
||||
|
@ -688,7 +744,7 @@ async function fetchChannelDifference(
|
|||
|
||||
if (message._ === 'messageEmpty') return
|
||||
|
||||
state.pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers))
|
||||
state.pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true))
|
||||
})
|
||||
break
|
||||
}
|
||||
|
@ -707,11 +763,11 @@ async function fetchChannelDifference(
|
|||
|
||||
if (message._ === 'messageEmpty') return
|
||||
|
||||
state.pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers))
|
||||
state.pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true))
|
||||
})
|
||||
|
||||
diff.otherUpdates.forEach((upd) => {
|
||||
const parsed = toPendingUpdate(upd, peers)
|
||||
const parsed = toPendingUpdate(upd, peers, true)
|
||||
|
||||
state.log.debug(
|
||||
'processing %s from diff for channel %d, pts_before: %d, pts: %d',
|
||||
|
@ -733,6 +789,8 @@ async function fetchChannelDifference(
|
|||
|
||||
state.cpts.set(channelId, pts)
|
||||
state.cptsMod.set(channelId, pts)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
function fetchChannelDifferenceLater(
|
||||
|
@ -741,17 +799,21 @@ function fetchChannelDifferenceLater(
|
|||
requestedDiff: Map<number, Promise<void>>,
|
||||
channelId: number,
|
||||
fallbackPts?: number,
|
||||
force = false,
|
||||
): void {
|
||||
if (!requestedDiff.has(channelId)) {
|
||||
requestedDiff.set(
|
||||
channelId,
|
||||
fetchChannelDifference(client, state, channelId, fallbackPts, force)
|
||||
fetchChannelDifference(client, state, channelId, fallbackPts)
|
||||
.catch((err) => {
|
||||
state.log.warn('error fetching difference for %d: %s', channelId, err)
|
||||
})
|
||||
.then(() => {
|
||||
.then((ok) => {
|
||||
requestedDiff.delete(channelId)
|
||||
|
||||
if (!ok) {
|
||||
state.log.debug('channel difference for %d failed, falling back to common diff', channelId)
|
||||
fetchDifferenceLater(client, state, requestedDiff)
|
||||
}
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
@ -802,7 +864,7 @@ async function fetchDifference(
|
|||
if (message._ === 'messageEmpty') return
|
||||
|
||||
// pts does not need to be checked for them
|
||||
state.pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers))
|
||||
state.pendingUnorderedUpdates.pushBack(toPendingUpdate(messageToUpdate(message), peers, true))
|
||||
})
|
||||
|
||||
diff.otherUpdates.forEach((upd) => {
|
||||
|
@ -820,7 +882,7 @@ async function fetchDifference(
|
|||
|
||||
if (isMessageEmpty(upd)) return
|
||||
|
||||
const parsed = toPendingUpdate(upd, peers)
|
||||
const parsed = toPendingUpdate(upd, peers, true)
|
||||
|
||||
if (parsed.channelId && parsed.ptsBefore) {
|
||||
// we need to check pts for these updates, put into pts queue
|
||||
|
@ -870,6 +932,11 @@ function fetchDifferenceLater(
|
|||
}
|
||||
|
||||
state.log.warn('error fetching common difference: %s', err)
|
||||
|
||||
if (tl.RpcError.is(err, 'PERSISTENT_TIMESTAMP_INVALID')) {
|
||||
// this function never throws
|
||||
return fetchUpdatesState(client, state)
|
||||
}
|
||||
})
|
||||
.then(() => {
|
||||
requestedDiff.delete(0)
|
||||
|
@ -888,48 +955,44 @@ async function onUpdate(
|
|||
): Promise<void> {
|
||||
const upd = pending.update
|
||||
|
||||
// check for min peers, try to replace them
|
||||
let missing: Set<number> | undefined = undefined
|
||||
|
||||
// it is important to do this before updating pts
|
||||
if (pending.peers && pending.peers.hasMin) {
|
||||
if (!(await replaceMinPeers(client, pending.peers))) {
|
||||
if (pending.peers.hasMin || pending.peers.empty) {
|
||||
// even if we have min peers in difference, we can't do anything about them.
|
||||
// we still want to collect them, so we can fetch them in the background.
|
||||
// we won't wait for them, since that would block the updates loop
|
||||
|
||||
missing = await fetchMissingPeers(client, upd, pending.peers, pending.fromDifference)
|
||||
|
||||
if (!pending.fromDifference && missing.size) {
|
||||
state.log.debug(
|
||||
'fetching difference because some peers were min and not cached for %s (pts = %d, cid = %d)',
|
||||
'fetching difference because some peers were min (%J) and not cached for %s (pts = %d, cid = %d)',
|
||||
missing,
|
||||
upd._,
|
||||
pending.pts,
|
||||
pending.channelId,
|
||||
)
|
||||
|
||||
if (pending.channelId) {
|
||||
fetchChannelDifferenceLater(client, state, requestedDiff, pending.channelId)
|
||||
if (pending.channelId && !(upd._ === 'updateNewChannelMessage' && upd.message._ === 'messageService')) {
|
||||
// don't replace service messages, because they can be about bot's kicking
|
||||
fetchChannelDifferenceLater(client, state, requestedDiff, pending.channelId, pending.ptsBefore)
|
||||
} else {
|
||||
fetchDifferenceLater(client, state, requestedDiff)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if (!pending.peers) {
|
||||
// this is a short update, we need to fetch the peers
|
||||
const peers = await fetchPeersForShort(client, upd)
|
||||
|
||||
if (!peers) {
|
||||
if (missing.size) {
|
||||
state.log.debug(
|
||||
'fetching difference because some peers were not available for short %s (pts = %d, cid = %d)',
|
||||
'peers still missing after fetching difference: %J for %s (pts = %d, cid = %d)',
|
||||
missing,
|
||||
upd._,
|
||||
pending.pts,
|
||||
pending.channelId,
|
||||
)
|
||||
|
||||
if (pending.channelId) {
|
||||
fetchChannelDifferenceLater(client, state, requestedDiff, pending.channelId)
|
||||
} else {
|
||||
fetchDifferenceLater(client, state, requestedDiff)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
pending.peers = peers
|
||||
}
|
||||
|
||||
// apply new pts/qts, if applicable
|
||||
|
@ -988,7 +1051,7 @@ async function onUpdate(
|
|||
|
||||
// updates that are also used internally
|
||||
switch (upd._) {
|
||||
case 'dummyUpdate':
|
||||
case 'mtcute.dummyUpdate':
|
||||
// we just needed to apply new pts values
|
||||
return
|
||||
case 'updateDcOptions': {
|
||||
|
@ -1000,18 +1063,69 @@ async function onUpdate(
|
|||
dcOptions: upd.dcOptions,
|
||||
})
|
||||
} else {
|
||||
await client.network.config.update(true)
|
||||
client.network.config.update(true).catch((err) => client._emitError(err))
|
||||
}
|
||||
break
|
||||
}
|
||||
case 'updateConfig':
|
||||
await client.network.config.update(true)
|
||||
client.network.config.update(true).catch((err) => client._emitError(err))
|
||||
break
|
||||
case 'updateUserName':
|
||||
if (upd.userId === state.auth.userId) {
|
||||
state.auth.selfUsername = upd.usernames.find((it) => it.active)?.username ?? null
|
||||
}
|
||||
break
|
||||
case 'updateDeleteChannelMessages':
|
||||
if (!state.auth.isBot) {
|
||||
await client.storage.deleteReferenceMessages(toggleChannelIdMark(upd.channelId), upd.messages)
|
||||
}
|
||||
break
|
||||
case 'updateNewMessage':
|
||||
case 'updateEditMessage':
|
||||
case 'updateNewChannelMessage':
|
||||
case 'updateEditChannelMessage':
|
||||
if (!state.auth.isBot) {
|
||||
await storeMessageReferences(client, upd.message)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if (missing?.size) {
|
||||
if (state.auth.isBot) {
|
||||
state.log.warn(
|
||||
'missing peers (%J) after getDifference for %s (pts = %d, cid = %d)',
|
||||
missing,
|
||||
upd._,
|
||||
pending.pts,
|
||||
pending.channelId,
|
||||
)
|
||||
} else {
|
||||
// force save storage so the min peers are stored
|
||||
await client.storage.save?.()
|
||||
|
||||
for (const id of missing) {
|
||||
Promise.resolve(client.storage.getPeerById(id))
|
||||
.then((peer): unknown => {
|
||||
if (!peer) {
|
||||
state.log.warn('cannot fetch full peer %d - getPeerById returned null', id)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// the peer will be automatically cached by the `.call()`, we don't have to do anything
|
||||
if (isInputPeerChannel(peer)) {
|
||||
return _getChannelsBatched(client, normalizeToInputChannel(peer))
|
||||
} else if (isInputPeerUser(peer)) {
|
||||
return _getUsersBatched(client, normalizeToInputUser(peer))
|
||||
}
|
||||
|
||||
state.log.warn('cannot fetch full peer %d - unknown peer type %s', id, peer._)
|
||||
})
|
||||
.catch((err) => {
|
||||
state.log.warn('error fetching full peer %d: %s', id, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dispatch the update
|
||||
|
@ -1035,10 +1149,6 @@ async function onUpdate(
|
|||
state.handler(upd, pending.peers)
|
||||
}
|
||||
|
||||
// todo: updateChannelTooLong with catchUpChannels disabled should not trigger getDifference (?)
|
||||
// todo: when min peer or similar use pts_before as base pts for channels
|
||||
// todo: fetchDiff when Session loss on the server: the client receives a new session created notification
|
||||
|
||||
async function updatesLoop(client: BaseTelegramClient, state: UpdatesState): Promise<void> {
|
||||
const { log } = state
|
||||
|
||||
|
@ -1125,14 +1235,35 @@ async function updatesLoop(client: BaseTelegramClient, state: UpdatesState): Pro
|
|||
const peers = PeersIndex.from(upd)
|
||||
|
||||
for (const update of upd.updates) {
|
||||
if (update._ === 'updateChannelTooLong') {
|
||||
log.debug(
|
||||
'received updateChannelTooLong for channel %d (pts = %d) in container, fetching diff',
|
||||
update.channelId,
|
||||
update.pts,
|
||||
)
|
||||
fetchChannelDifferenceLater(client, state, requestedDiff, update.channelId, update.pts)
|
||||
continue
|
||||
switch (update._) {
|
||||
case 'updateChannelTooLong':
|
||||
log.debug(
|
||||
'received updateChannelTooLong for channel %d (pts = %d) in container, fetching diff',
|
||||
update.channelId,
|
||||
update.pts,
|
||||
)
|
||||
fetchChannelDifferenceLater(
|
||||
client,
|
||||
state,
|
||||
requestedDiff,
|
||||
update.channelId,
|
||||
update.pts,
|
||||
)
|
||||
continue
|
||||
case 'updatePtsChanged':
|
||||
// see https://github.com/tdlib/td/blob/07c1d53a6d3cb1fad58d2822e55eef6d57363581/td/telegram/UpdatesManager.cpp#L4051
|
||||
if (client.network.getPoolSize('main') > 1) {
|
||||
// highload bot
|
||||
state.log.debug(
|
||||
'updatePtsChanged received, resetting pts to 1 and fetching difference',
|
||||
)
|
||||
state.pts = 1
|
||||
fetchDifferenceLater(client, state, requestedDiff)
|
||||
} else {
|
||||
state.log.debug('updatePtsChanged received, fetching updates state')
|
||||
await fetchUpdatesState(client, state)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
const parsed = toPendingUpdate(update, peers)
|
||||
|
@ -1156,7 +1287,7 @@ async function updatesLoop(client: BaseTelegramClient, state: UpdatesState): Pro
|
|||
case 'updateShort': {
|
||||
log.debug('received short %s', upd._)
|
||||
|
||||
const parsed = toPendingUpdate(upd.update)
|
||||
const parsed = toPendingUpdate(upd.update, new PeersIndex())
|
||||
|
||||
if (parsed.ptsBefore !== undefined) {
|
||||
state.pendingPtsUpdates.add(parsed)
|
||||
|
@ -1206,6 +1337,8 @@ async function updatesLoop(client: BaseTelegramClient, state: UpdatesState): Pro
|
|||
update,
|
||||
ptsBefore: upd.pts - upd.ptsCount,
|
||||
pts: upd.pts,
|
||||
peers: new PeersIndex(),
|
||||
fromDifference: false,
|
||||
})
|
||||
|
||||
break
|
||||
|
@ -1248,6 +1381,8 @@ async function updatesLoop(client: BaseTelegramClient, state: UpdatesState): Pro
|
|||
update,
|
||||
ptsBefore: upd.pts - upd.ptsCount,
|
||||
pts: upd.pts,
|
||||
peers: new PeersIndex(),
|
||||
fromDifference: false,
|
||||
})
|
||||
|
||||
break
|
||||
|
|
|
@ -76,7 +76,8 @@ export interface PendingUpdate {
|
|||
qts?: number
|
||||
qtsBefore?: number
|
||||
timeout?: number
|
||||
peers?: PeersIndex
|
||||
peers: PeersIndex
|
||||
fromDifference: boolean
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
|
@ -181,7 +182,7 @@ export function createUpdatesState(
|
|||
}
|
||||
}
|
||||
|
||||
export function toPendingUpdate(upd: tl.TypeUpdate, peers?: PeersIndex): PendingUpdate {
|
||||
export function toPendingUpdate(upd: tl.TypeUpdate, peers: PeersIndex, fromDifference = false): PendingUpdate {
|
||||
const channelId = extractChannelIdFromUpdate(upd) || 0
|
||||
const pts = 'pts' in upd ? upd.pts : undefined
|
||||
// eslint-disable-next-line no-nested-ternary
|
||||
|
@ -196,5 +197,6 @@ export function toPendingUpdate(upd: tl.TypeUpdate, peers?: PeersIndex): Pending
|
|||
qts,
|
||||
qtsBefore: qts ? qts - 1 : undefined,
|
||||
peers,
|
||||
fromDifference,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,6 +46,49 @@ describe('resolvePeer', () => {
|
|||
})
|
||||
})
|
||||
|
||||
it('should extract input peer from dummy min peers', async () => {
|
||||
const client = StubTelegramClient.offline()
|
||||
|
||||
await client.registerPeers(
|
||||
createStub('channel', {
|
||||
id: 456,
|
||||
accessHash: Long.fromBits(111, 222),
|
||||
}),
|
||||
)
|
||||
await client.storage.saveReferenceMessage(123, -1000000000456, 789)
|
||||
await client.storage.saveReferenceMessage(-1000000000123, -1000000000456, 789)
|
||||
|
||||
const resolved = await resolvePeer(client, {
|
||||
_: 'mtcute.dummyInputPeerMinUser',
|
||||
userId: 123,
|
||||
})
|
||||
const resolved2 = await resolvePeer(client, {
|
||||
_: 'mtcute.dummyInputPeerMinChannel',
|
||||
channelId: 123,
|
||||
})
|
||||
|
||||
expect(resolved).toEqual({
|
||||
_: 'inputPeerUserFromMessage',
|
||||
userId: 123,
|
||||
peer: {
|
||||
_: 'inputPeerChannel',
|
||||
channelId: 456,
|
||||
accessHash: Long.fromBits(111, 222),
|
||||
},
|
||||
msgId: 789,
|
||||
})
|
||||
expect(resolved2).toEqual({
|
||||
_: 'inputPeerChannelFromMessage',
|
||||
channelId: 123,
|
||||
peer: {
|
||||
_: 'inputPeerChannel',
|
||||
channelId: 456,
|
||||
accessHash: Long.fromBits(111, 222),
|
||||
},
|
||||
msgId: 789,
|
||||
})
|
||||
})
|
||||
|
||||
it('should return inputPeerSelf for me/self', async () => {
|
||||
expect(await resolvePeer(StubTelegramClient.offline(), 'me')).toEqual({ _: 'inputPeerSelf' })
|
||||
expect(await resolvePeer(StubTelegramClient.offline(), 'self')).toEqual({ _: 'inputPeerSelf' })
|
||||
|
@ -72,6 +115,31 @@ describe('resolvePeer', () => {
|
|||
})
|
||||
})
|
||||
|
||||
it('should try checking for message references in storage', async () => {
|
||||
const client = StubTelegramClient.offline()
|
||||
|
||||
await client.registerPeers(
|
||||
createStub('channel', {
|
||||
id: 456,
|
||||
accessHash: Long.fromBits(111, 222),
|
||||
}),
|
||||
)
|
||||
await client.storage.saveReferenceMessage(123, -1000000000456, 789)
|
||||
|
||||
const resolved = await resolvePeer(client, 123)
|
||||
|
||||
expect(resolved).toEqual({
|
||||
_: 'inputPeerUserFromMessage',
|
||||
userId: 123,
|
||||
peer: {
|
||||
_: 'inputPeerChannel',
|
||||
channelId: 456,
|
||||
accessHash: Long.fromBits(111, 222),
|
||||
},
|
||||
msgId: 789,
|
||||
})
|
||||
})
|
||||
|
||||
it('should return user with zero hash if not in storage', async () => {
|
||||
const client = new StubTelegramClient()
|
||||
|
||||
|
@ -105,6 +173,31 @@ describe('resolvePeer', () => {
|
|||
})
|
||||
})
|
||||
|
||||
it('should try checking for message references in storage', async () => {
|
||||
const client = StubTelegramClient.offline()
|
||||
|
||||
await client.registerPeers(
|
||||
createStub('channel', {
|
||||
id: 456,
|
||||
accessHash: Long.fromBits(111, 222),
|
||||
}),
|
||||
)
|
||||
await client.storage.saveReferenceMessage(-1000000000123, -1000000000456, 789)
|
||||
|
||||
const resolved = await resolvePeer(client, -1000000000123)
|
||||
|
||||
expect(resolved).toEqual({
|
||||
_: 'inputPeerChannelFromMessage',
|
||||
channelId: 123,
|
||||
peer: {
|
||||
_: 'inputPeerChannel',
|
||||
channelId: 456,
|
||||
accessHash: Long.fromBits(111, 222),
|
||||
},
|
||||
msgId: 789,
|
||||
})
|
||||
})
|
||||
|
||||
it('should return channel with zero hash if not in storage', async () => {
|
||||
const client = new StubTelegramClient()
|
||||
|
||||
|
@ -119,7 +212,7 @@ describe('resolvePeer', () => {
|
|||
})
|
||||
|
||||
describe('chats', () => {
|
||||
it('should always return zero hash', async () => {
|
||||
it('should correctly resolve', async () => {
|
||||
const client = StubTelegramClient.offline()
|
||||
|
||||
const resolved = await resolvePeer(client, -123)
|
||||
|
|
|
@ -31,9 +31,22 @@ export async function resolvePeer(
|
|||
peerId = getMarkedPeerId(peerId)
|
||||
} else if ('inputPeer' in peerId) {
|
||||
// User | Chat
|
||||
return peerId.inputPeer
|
||||
peerId = peerId.inputPeer
|
||||
} else {
|
||||
return normalizeToInputPeer(peerId)
|
||||
peerId = normalizeToInputPeer(peerId)
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof peerId === 'object') {
|
||||
switch (peerId._) {
|
||||
case 'mtcute.dummyInputPeerMinUser':
|
||||
peerId = peerId.userId
|
||||
break
|
||||
case 'mtcute.dummyInputPeerMinChannel':
|
||||
peerId = toggleChannelIdMark(peerId.channelId)
|
||||
break
|
||||
default:
|
||||
return peerId
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -57,11 +57,53 @@ export class Chat {
|
|||
}
|
||||
|
||||
/**
|
||||
* Chat's input peer
|
||||
* Whether this chat's information is incomplete.
|
||||
*
|
||||
* This usually only happens in large chats, where
|
||||
* the server sometimes sends only a part of the chat's
|
||||
* information. Basic info like name and profile photo
|
||||
* are always available, but other fields may be omitted
|
||||
* despite being available.
|
||||
*
|
||||
* It was observed that these fields may be missing:
|
||||
* - `isMember`
|
||||
* - and probably more
|
||||
*
|
||||
* This currently only ever happens for non-bot users, so if you are building
|
||||
* a normal bot, you can safely ignore this field.
|
||||
*
|
||||
* To fetch the "complete" user information, use one of these methods:
|
||||
* - {@link TelegramClient.getChat}
|
||||
* - {@link TelegramClient.getFullChat}.
|
||||
*
|
||||
* Learn more: [Incomplete peers](https://mtcute.dev/guide/topics/peers.html#incomplete-peers)
|
||||
*/
|
||||
get isMin(): boolean {
|
||||
// avoid additional runtime checks
|
||||
return Boolean((this.peer as { min?: boolean }).min)
|
||||
}
|
||||
|
||||
/**
|
||||
* Chat's input peer for advanced use-cases.
|
||||
*
|
||||
* > **Note**: for {@link min} chats, this method will return
|
||||
* > `mtcute.dummyInputPeerMin*`, which are actually not a valid input peer,
|
||||
* > These are used to indicate that the user is incomplete, and a message
|
||||
* > reference is needed to resolve the peer.
|
||||
* >
|
||||
* > Such objects are handled by {@link TelegramClient.resolvePeer} method,
|
||||
* so prefer using it whenever you need an input peer.
|
||||
*/
|
||||
get inputPeer(): tl.TypeInputPeer {
|
||||
switch (this.peer._) {
|
||||
case 'user':
|
||||
if (this.peer.min) {
|
||||
return {
|
||||
_: 'mtcute.dummyInputPeerMinUser',
|
||||
userId: this.peer.id,
|
||||
}
|
||||
}
|
||||
|
||||
if (!this.peer.accessHash) {
|
||||
throw new MtArgumentError("Peer's access hash is not available!")
|
||||
}
|
||||
|
@ -79,6 +121,13 @@ export class Chat {
|
|||
}
|
||||
case 'channel':
|
||||
case 'channelForbidden':
|
||||
if ((this.peer as tl.RawChannel).min) {
|
||||
return {
|
||||
_: 'mtcute.dummyInputPeerMinChannel',
|
||||
channelId: this.peer.id,
|
||||
}
|
||||
}
|
||||
|
||||
if (!this.peer.accessHash) {
|
||||
throw new MtArgumentError("Peer's access hash is not available!")
|
||||
}
|
||||
|
@ -188,6 +237,23 @@ export class Chat {
|
|||
return this.peer._ === 'channel' && this.peer.forum!
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the current user is a member of the chat.
|
||||
*
|
||||
* For users, this is always `true`.
|
||||
*/
|
||||
get isMember(): boolean {
|
||||
switch (this.peer._) {
|
||||
case 'user':
|
||||
return true
|
||||
case 'channel':
|
||||
case 'chat':
|
||||
return !this.peer.left
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/** Whether you have hidden (arhived) this chat's stories */
|
||||
get storiesHidden(): boolean {
|
||||
return 'storiesHidden' in this.peer ? this.peer.storiesHidden! : false
|
||||
|
|
|
@ -36,6 +36,10 @@ export class PeersIndex {
|
|||
return index
|
||||
}
|
||||
|
||||
get empty(): boolean {
|
||||
return this.users.size === 0 && this.chats.size === 0
|
||||
}
|
||||
|
||||
user(id: number): tl.TypeUser {
|
||||
const r = this.users.get(id)
|
||||
|
||||
|
|
|
@ -33,6 +33,21 @@ describe('User', () => {
|
|||
|
||||
expect(() => user.inputPeer).toThrow()
|
||||
})
|
||||
|
||||
it('should return correct input peer for min', () => {
|
||||
const user = new User(
|
||||
createStub('user', {
|
||||
id: 123,
|
||||
accessHash: Long.fromBits(456, 789),
|
||||
min: true,
|
||||
}),
|
||||
)
|
||||
|
||||
expect(user.inputPeer).toEqual({
|
||||
_: 'mtcute.dummyInputPeerMinUser',
|
||||
userId: 123,
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('status', () => {
|
||||
|
|
|
@ -45,6 +45,36 @@ export class User {
|
|||
return this.raw.id
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether this user's information is incomplete.
|
||||
*
|
||||
* This usually only happens in large chats, where
|
||||
* the server sometimes sends only a part of the user's
|
||||
* information. Basic info like name and profile photo
|
||||
* are always available, but other fields may be omitted
|
||||
* despite being available.
|
||||
*
|
||||
* It was observed that these fields may be missing:
|
||||
* - `username, usernames`
|
||||
* - `status, lastOnline, nextOffline`
|
||||
* - `storiesMaxId`
|
||||
* - `photo` - in some cases when user has some some privacy settings
|
||||
* - and probably more
|
||||
*
|
||||
* This currently only ever happens for non-bot users, so if you are building
|
||||
* a normal bot, you can safely ignore this field.
|
||||
*
|
||||
* To fetch the "complete" user information, use one of these methods:
|
||||
* - {@link TelegramClient.getUsers}
|
||||
* - {@link TelegramClient.getChat}
|
||||
* - {@link TelegramClient.getFullChat}.
|
||||
*
|
||||
* Learn more: [Incomplete peers](https://mtcute.dev/guide/topics/peers.html#incomplete-peers)
|
||||
*/
|
||||
get isMin(): boolean {
|
||||
return this.raw.min!
|
||||
}
|
||||
|
||||
/** Whether this user is you yourself */
|
||||
get isSelf(): boolean {
|
||||
return this.raw.self!
|
||||
|
@ -260,8 +290,23 @@ export class User {
|
|||
|
||||
/**
|
||||
* Get this user's input peer for advanced use-cases.
|
||||
*
|
||||
* > **Note**: for {@link min} users, this method will return
|
||||
* > `mtcute.dummyInputPeerMinUser`, which is actually not a valid input peer.
|
||||
* > These are used to indicate that the user is incomplete, and a message
|
||||
* > reference is needed to resolve the peer.
|
||||
* >
|
||||
* > Such objects are handled by {@link TelegramClient.resolvePeer} method,
|
||||
* > so prefer using it whenever you need an input peer.
|
||||
*/
|
||||
get inputPeer(): tl.TypeInputPeer {
|
||||
if (this.raw.min) {
|
||||
return {
|
||||
_: 'mtcute.dummyInputPeerMinUser',
|
||||
userId: this.raw.id,
|
||||
}
|
||||
}
|
||||
|
||||
if (!this.raw.accessHash) {
|
||||
throw new MtArgumentError("user's access hash is not available!")
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ export function createDummyUpdate(pts: number, ptsCount: number, channelId = 0):
|
|||
users: [],
|
||||
updates: [
|
||||
{
|
||||
_: 'dummyUpdate',
|
||||
_: 'mtcute.dummyUpdate',
|
||||
channelId,
|
||||
pts,
|
||||
ptsCount,
|
||||
|
|
|
@ -494,21 +494,15 @@ export class BaseTelegramClient extends EventEmitter {
|
|||
|
||||
/**
|
||||
* Adds all peers from a given object to entity cache in storage.
|
||||
*
|
||||
* @returns `true` if there were any `min` peers
|
||||
*/
|
||||
async _cachePeersFrom(obj: object): Promise<boolean> {
|
||||
async _cachePeersFrom(obj: object): Promise<void> {
|
||||
const parsedPeers: ITelegramStorage.PeerInfo[] = []
|
||||
|
||||
let hadMin = false
|
||||
let count = 0
|
||||
|
||||
for (const peer of getAllPeersFrom(obj as tl.TlObject)) {
|
||||
if ((peer as any).min) {
|
||||
// absolutely incredible min peer handling, courtesy of levlam.
|
||||
// see this thread: https://t.me/tdlibchat/15084
|
||||
hadMin = true
|
||||
this.log.debug('received min peer: %j', peer)
|
||||
// no point in caching min peers as we can't use them
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -560,8 +554,6 @@ export class BaseTelegramClient extends EventEmitter {
|
|||
await this.storage.updatePeers(parsedPeers)
|
||||
this.log.debug('cached %d peers', count)
|
||||
}
|
||||
|
||||
return hadMin
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -533,7 +533,7 @@ export class NetworkManager {
|
|||
throw new MtArgumentError('DC manager already exists')
|
||||
}
|
||||
|
||||
const dc = new DcConnectionManager(this, defaultDcs.main.id, defaultDcs)
|
||||
const dc = new DcConnectionManager(this, defaultDcs.main.id, defaultDcs, true)
|
||||
this._dcConnections.set(defaultDcs.main.id, dc)
|
||||
await this._switchPrimaryDc(dc)
|
||||
}
|
||||
|
@ -619,7 +619,7 @@ export class NetworkManager {
|
|||
const options = await this._findDcOptions(newDc)
|
||||
|
||||
if (!this._dcConnections.has(newDc)) {
|
||||
this._dcConnections.set(newDc, new DcConnectionManager(this, newDc, options))
|
||||
this._dcConnections.set(newDc, new DcConnectionManager(this, newDc, options, true))
|
||||
}
|
||||
|
||||
await this._storage.setDefaultDcs(options)
|
||||
|
|
|
@ -312,7 +312,6 @@ export class SessionConnection extends PersistentConnection {
|
|||
this._isPfsBindingPending = true
|
||||
}
|
||||
|
||||
process.exit(0)
|
||||
doAuthorization(this, this._crypto, TEMP_AUTH_KEY_EXPIRY)
|
||||
.then(async ([tempAuthKey, tempServerSalt]) => {
|
||||
if (!this._usePfs) {
|
||||
|
|
|
@ -135,19 +135,42 @@ export interface ITelegramStorage {
|
|||
* are called, so you can safely batch these updates
|
||||
*/
|
||||
updatePeers(peers: ITelegramStorage.PeerInfo[]): MaybeAsync<void>
|
||||
|
||||
/**
|
||||
* Find a peer in local database by its marked ID
|
||||
*
|
||||
* If no peer was found, the storage should try searching its
|
||||
* reference messages database. If a reference message is found,
|
||||
* a `inputPeer*FromMessage` constructor should be returned
|
||||
*/
|
||||
getPeerById(peerId: number): MaybeAsync<tl.TypeInputPeer | null>
|
||||
|
||||
/**
|
||||
* Find a peer in local database by its username
|
||||
*/
|
||||
getPeerByUsername(username: string): MaybeAsync<tl.TypeInputPeer | null>
|
||||
|
||||
/**
|
||||
* Find a peer in local database by its phone number
|
||||
*/
|
||||
getPeerByPhone(phone: string): MaybeAsync<tl.TypeInputPeer | null>
|
||||
|
||||
/**
|
||||
* For `*FromMessage` constructors: store a reference to a `peerId` -
|
||||
* it was seen in message `messageId` in chat `chatId`.
|
||||
*
|
||||
* `peerId` and `chatId` are marked peer IDs.
|
||||
*
|
||||
* Learn more: https://core.telegram.org/api/min
|
||||
*/
|
||||
saveReferenceMessage(peerId: number, chatId: number, messageId: number): MaybeAsync<void>
|
||||
|
||||
/**
|
||||
* For `*FromMessage` constructors: messages `messageIds` in chat `chatId` were deleted,
|
||||
* so remove any stored peer references to them.
|
||||
*/
|
||||
deleteReferenceMessages(chatId: number, messageIds: number[]): MaybeAsync<void>
|
||||
|
||||
/**
|
||||
* Get updates state (if available), represented as a tuple
|
||||
* containing: `pts, qts, date, seq`
|
||||
|
|
|
@ -31,6 +31,7 @@ export class JsonMemoryStorage extends MemoryStorage {
|
|||
case 'pts':
|
||||
case 'fsm':
|
||||
case 'rl':
|
||||
case 'refs':
|
||||
return new Map(Object.entries(value as Record<string, string>))
|
||||
case 'entities':
|
||||
return new Map()
|
||||
|
|
|
@ -14,7 +14,7 @@ describe('MemoryStorage', () => {
|
|||
constructor() {
|
||||
super()
|
||||
this._setStateFrom({
|
||||
$version: 1,
|
||||
$version: 2,
|
||||
defaultDcs: null,
|
||||
authKeys: new Map(),
|
||||
authKeysTemp: new Map(),
|
||||
|
@ -26,6 +26,7 @@ describe('MemoryStorage', () => {
|
|||
pts: new Map(),
|
||||
fsm: new Map(),
|
||||
rl: new Map(),
|
||||
refs: new Map(),
|
||||
self: null,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ import { tl } from '@mtcute/tl'
|
|||
import { LruMap, toggleChannelIdMark } from '../utils/index.js'
|
||||
import { ITelegramStorage } from './abstract.js'
|
||||
|
||||
const CURRENT_VERSION = 1
|
||||
const CURRENT_VERSION = 2
|
||||
|
||||
type PeerInfoWithUpdated = ITelegramStorage.PeerInfo & { updated: number }
|
||||
|
||||
|
@ -23,6 +23,9 @@ export interface MemorySessionState {
|
|||
// username -> peer id
|
||||
usernameIndex: Map<string, number>
|
||||
|
||||
// reference messages. peer id -> `${chat id}:${msg id}][]
|
||||
refs: Map<number, Set<string>>
|
||||
|
||||
// common pts, date, seq, qts
|
||||
gpts: [number, number, number, number] | null
|
||||
// channel pts
|
||||
|
@ -110,12 +113,15 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
entities: new Map(),
|
||||
phoneIndex: new Map(),
|
||||
usernameIndex: new Map(),
|
||||
refs: new Map(),
|
||||
gpts: null,
|
||||
pts: new Map(),
|
||||
fsm: new Map(),
|
||||
rl: new Map(),
|
||||
self: null,
|
||||
}
|
||||
this._cachedInputPeers?.clear()
|
||||
this._cachedFull?.clear()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -125,7 +131,14 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
* you plan on using it somewhere else, be sure to copy it beforehand.
|
||||
*/
|
||||
protected _setStateFrom(obj: MemorySessionState): void {
|
||||
if (obj.$version !== CURRENT_VERSION) return
|
||||
let ver = obj.$version as number
|
||||
|
||||
if (ver === 1) {
|
||||
// v2: introduced message references
|
||||
obj.refs = new Map()
|
||||
obj.$version = ver = 2
|
||||
}
|
||||
if (ver !== CURRENT_VERSION) return
|
||||
|
||||
// populate indexes if needed
|
||||
let populate = false
|
||||
|
@ -252,6 +265,11 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
if (peer.phone) this._state.phoneIndex.set(peer.phone, peer.id)
|
||||
|
||||
this._state.entities.set(peer.id, peer)
|
||||
|
||||
// no point in storing references anymore, since we have the full peer
|
||||
if (this._state.refs.has(peer.id)) {
|
||||
this._state.refs.delete(peer.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -279,11 +297,42 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
}
|
||||
}
|
||||
|
||||
private _findPeerByRef(peerId: number): tl.TypeInputPeer | null {
|
||||
const refs = this._state.refs.get(peerId)
|
||||
if (!refs || refs.size === 0) return null
|
||||
|
||||
const [ref] = refs.values()
|
||||
const [chatId, msgId] = ref.split(':').map(Number)
|
||||
|
||||
const chatPeer = this._getInputPeer(this._state.entities.get(chatId))
|
||||
if (!chatPeer) return null
|
||||
|
||||
if (peerId > 0) {
|
||||
// user
|
||||
return {
|
||||
_: 'inputPeerUserFromMessage',
|
||||
msgId,
|
||||
userId: peerId,
|
||||
peer: chatPeer,
|
||||
}
|
||||
}
|
||||
|
||||
// channel
|
||||
return {
|
||||
_: 'inputPeerChannelFromMessage',
|
||||
msgId,
|
||||
channelId: toggleChannelIdMark(peerId),
|
||||
peer: chatPeer,
|
||||
}
|
||||
}
|
||||
|
||||
getPeerById(peerId: number): tl.TypeInputPeer | null {
|
||||
if (this._cachedInputPeers.has(peerId)) {
|
||||
return this._cachedInputPeers.get(peerId)!
|
||||
}
|
||||
const peer = this._getInputPeer(this._state.entities.get(peerId))
|
||||
|
||||
let peer = this._getInputPeer(this._state.entities.get(peerId))
|
||||
if (!peer) peer = this._findPeerByRef(peerId)
|
||||
if (peer) this._cachedInputPeers.set(peerId, peer)
|
||||
|
||||
return peer
|
||||
|
@ -307,6 +356,23 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
return this._getInputPeer(peer)
|
||||
}
|
||||
|
||||
saveReferenceMessage(peerId: number, chatId: number, messageId: number): void {
|
||||
if (!this._state.refs.has(peerId)) {
|
||||
this._state.refs.set(peerId, new Set())
|
||||
}
|
||||
|
||||
this._state.refs.get(peerId)!.add(`${chatId}:${messageId}`)
|
||||
}
|
||||
|
||||
deleteReferenceMessages(chatId: number, messageIds: number[]): void {
|
||||
// not the most efficient way, but it's fine
|
||||
for (const refs of this._state.refs.values()) {
|
||||
for (const msg of messageIds) {
|
||||
refs.delete(`${chatId}:${msg}`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
getSelf(): ITelegramStorage.SelfInfo | null {
|
||||
return this._state.self
|
||||
}
|
||||
|
|
|
@ -70,6 +70,7 @@ export function getMarkedPeerId(
|
|||
}
|
||||
|
||||
switch (peer._) {
|
||||
case 'mtcute.dummyInputPeerMinUser':
|
||||
case 'peerUser':
|
||||
case 'inputPeerUser':
|
||||
case 'inputPeerUserFromMessage':
|
||||
|
@ -79,6 +80,7 @@ export function getMarkedPeerId(
|
|||
case 'peerChat':
|
||||
case 'inputPeerChat':
|
||||
return -peer.chatId
|
||||
case 'mtcute.dummyInputPeerMinChannel':
|
||||
case 'peerChannel':
|
||||
case 'inputPeerChannel':
|
||||
case 'inputPeerChannelFromMessage':
|
||||
|
@ -87,7 +89,7 @@ export function getMarkedPeerId(
|
|||
return ZERO_CHANNEL_ID - peer.channelId
|
||||
}
|
||||
|
||||
throw new MtArgumentError('Invalid peer')
|
||||
throw new MtArgumentError(`Invalid peer: ${peer._}`)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,4 +1,12 @@
|
|||
import { Message, OmitInputMessageId, ParametersSkip1, TelegramClient } from '@mtcute/client'
|
||||
import {
|
||||
Chat,
|
||||
Message,
|
||||
MtPeerNotFoundError,
|
||||
OmitInputMessageId,
|
||||
ParametersSkip1,
|
||||
TelegramClient,
|
||||
User,
|
||||
} from '@mtcute/client'
|
||||
import { DeleteMessagesParams } from '@mtcute/client/src/methods/messages/delete-messages.js'
|
||||
import { ForwardMessageOptions } from '@mtcute/client/src/methods/messages/forward-messages.js'
|
||||
import { SendCopyParams } from '@mtcute/client/src/methods/messages/send-copy.js'
|
||||
|
@ -40,6 +48,29 @@ export class MessageContext extends Message implements UpdateContext<Message> {
|
|||
this.isMessageGroup = Array.isArray(message)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get complete information about {@link sender}
|
||||
*
|
||||
* Learn more: [Incomplete peers](https://mtcute.dev/guide/topics/peers.html#incomplete-peers)
|
||||
*/
|
||||
async getSender(): Promise<User | Chat> {
|
||||
if (!this.sender.isMin) return this.sender
|
||||
|
||||
let res
|
||||
|
||||
if (this.sender.type === 'user') {
|
||||
[res] = await this.client.getUsers(this.sender)
|
||||
} else {
|
||||
res = await this.client.getChat(this.sender)
|
||||
}
|
||||
|
||||
if (!res) throw new MtPeerNotFoundError('Failed to fetch sender')
|
||||
|
||||
Object.defineProperty(this, 'sender', { value: res })
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
/** Get a message that this message is a reply to */
|
||||
getReplyTo() {
|
||||
return this.client.getReplyTo(this)
|
||||
|
|
|
@ -232,3 +232,23 @@ export const replyTo =
|
|||
|
||||
return filter(reply, state)
|
||||
}
|
||||
|
||||
/**
|
||||
* Middleware-like filter that will fetch the sender of the message
|
||||
* and make it available to further filters, as well as the handler itself.
|
||||
*/
|
||||
export const withCompleteSender =
|
||||
<Mod, State extends object>(
|
||||
filter?: UpdateFilter<MessageContext, Mod, State>,
|
||||
): UpdateFilter<MessageContext, Mod, State> =>
|
||||
async (msg, state) => {
|
||||
try {
|
||||
await msg.getSender()
|
||||
} catch (e) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (!filter) return true
|
||||
|
||||
return filter(msg, state)
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ function getInputPeer(row: SqliteEntity | ITelegramStorage.PeerInfo): tl.TypeInp
|
|||
throw new Error(`Invalid peer type: ${row.type}`)
|
||||
}
|
||||
|
||||
const CURRENT_VERSION = 4
|
||||
const CURRENT_VERSION = 5
|
||||
|
||||
// language=SQLite format=false
|
||||
const TEMP_AUTH_TABLE = `
|
||||
|
@ -57,6 +57,16 @@ const TEMP_AUTH_TABLE = `
|
|||
);
|
||||
`
|
||||
|
||||
// language=SQLite format=false
|
||||
const MESSAGE_REFS_TABLE = `
|
||||
create table message_refs (
|
||||
peer_id integer primary key,
|
||||
chat_id integer not null,
|
||||
msg_id integer not null
|
||||
);
|
||||
create index idx_message_refs on message_refs (chat_id, msg_id);
|
||||
`
|
||||
|
||||
// language=SQLite format=false
|
||||
const SCHEMA = `
|
||||
create table kv (
|
||||
|
@ -93,6 +103,8 @@ const SCHEMA = `
|
|||
);
|
||||
create index idx_entities_username on entities (username);
|
||||
create index idx_entities_phone on entities (phone);
|
||||
|
||||
${MESSAGE_REFS_TABLE}
|
||||
`
|
||||
|
||||
// language=SQLite format=false
|
||||
|
@ -100,7 +112,8 @@ const RESET = `
|
|||
delete from kv where key <> 'ver';
|
||||
delete from state;
|
||||
delete from pts;
|
||||
delete from entities
|
||||
delete from entities;
|
||||
delete from message_refs;
|
||||
`
|
||||
const RESET_AUTH_KEYS = `
|
||||
delete from auth_keys;
|
||||
|
@ -129,6 +142,12 @@ interface FsmItem<T = unknown> {
|
|||
expires?: number
|
||||
}
|
||||
|
||||
interface MessageRef {
|
||||
peer_id: number
|
||||
chat_id: number
|
||||
msg_id: number
|
||||
}
|
||||
|
||||
const STATEMENTS = {
|
||||
getKv: 'select value from kv where key = ?',
|
||||
setKv: 'insert or replace into kv (key, value) values (?, ?)',
|
||||
|
@ -157,6 +176,11 @@ const STATEMENTS = {
|
|||
getEntByPhone: 'select * from entities where phone = ? limit 1',
|
||||
getEntByUser: 'select * from entities where username = ? limit 1',
|
||||
|
||||
storeMessageRef: 'insert or replace into message_refs (peer_id, chat_id, msg_id) values (?, ?, ?)',
|
||||
getMessageRef: 'select chat_id, msg_id from message_refs where peer_id = ?',
|
||||
delMessageRefs: 'delete from message_refs where chat_id = ? and msg_id = ?',
|
||||
delAllMessageRefs: 'delete from message_refs where peer_id = ?',
|
||||
|
||||
delStaleState: 'delete from state where expires < ?',
|
||||
} as const
|
||||
|
||||
|
@ -312,8 +336,6 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage*/ {
|
|||
)
|
||||
|
||||
this._vacuumInterval = params?.vacuumInterval ?? 300_000
|
||||
|
||||
// todo: add support for workers (idk if really needed, but still)
|
||||
}
|
||||
|
||||
setup(log: Logger, readerMap: TlReaderMap, writerMap: TlWriterMap): void {
|
||||
|
@ -393,6 +415,12 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage*/ {
|
|||
from = 4
|
||||
}
|
||||
|
||||
if (from === 4) {
|
||||
// message references support added
|
||||
this._db.exec(MESSAGE_REFS_TABLE)
|
||||
from = 5
|
||||
}
|
||||
|
||||
if (from !== CURRENT_VERSION) {
|
||||
// an assertion just in case i messed up
|
||||
throw new Error('Migration incomplete')
|
||||
|
@ -532,11 +560,19 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage*/ {
|
|||
this._statements.delAllAuthTemp.run(dcId)
|
||||
}
|
||||
|
||||
private _cachedSelf?: ITelegramStorage.SelfInfo | null
|
||||
getSelf(): ITelegramStorage.SelfInfo | null {
|
||||
return this._getFromKv('self')
|
||||
if (this._cachedSelf !== undefined) return this._cachedSelf
|
||||
|
||||
const self = this._getFromKv<ITelegramStorage.SelfInfo | null>('self')
|
||||
this._cachedSelf = self
|
||||
|
||||
return self
|
||||
}
|
||||
|
||||
setSelf(self: ITelegramStorage.SelfInfo | null): void {
|
||||
this._cachedSelf = self
|
||||
|
||||
return this._setToKv('self', self, true)
|
||||
}
|
||||
|
||||
|
@ -624,11 +660,43 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage*/ {
|
|||
peer: getInputPeer(peer),
|
||||
full: peer.full,
|
||||
})
|
||||
|
||||
// we have the full peer, we no longer need the references
|
||||
// we can skip this in the other branch, since in that case it would've already been deleted
|
||||
if (!this._cachedSelf?.isBot) {
|
||||
this._pending.push([this._statements.delAllMessageRefs, [peer.id]])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
getPeerById(peerId: number): tl.TypeInputPeer | null {
|
||||
private _findPeerByReference(peerId: number): tl.TypeInputPeer | null {
|
||||
const row = this._statements.getMessageRef.get(peerId) as MessageRef | null
|
||||
if (!row) return null
|
||||
|
||||
const chat = this.getPeerById(row.chat_id, false)
|
||||
if (!chat) return null
|
||||
|
||||
if (peerId > 0) {
|
||||
// user
|
||||
return {
|
||||
_: 'inputPeerUserFromMessage',
|
||||
peer: chat,
|
||||
userId: peerId,
|
||||
msgId: row.msg_id,
|
||||
}
|
||||
}
|
||||
|
||||
// channel
|
||||
return {
|
||||
_: 'inputPeerChannelFromMessage',
|
||||
peer: chat,
|
||||
channelId: toggleChannelIdMark(peerId),
|
||||
msgId: row.msg_id,
|
||||
}
|
||||
}
|
||||
|
||||
getPeerById(peerId: number, allowRefs = true): tl.TypeInputPeer | null {
|
||||
const cached = this._cache?.get(peerId)
|
||||
if (cached) return cached.peer
|
||||
|
||||
|
@ -644,6 +712,10 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage*/ {
|
|||
return peer
|
||||
}
|
||||
|
||||
if (allowRefs) {
|
||||
return this._findPeerByReference(peerId)
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
|
@ -699,6 +771,16 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage*/ {
|
|||
return null
|
||||
}
|
||||
|
||||
saveReferenceMessage(peerId: number, chatId: number, messageId: number): void {
|
||||
this._pending.push([this._statements.storeMessageRef, [peerId, chatId, messageId]])
|
||||
}
|
||||
|
||||
deleteReferenceMessages(chatId: number, messageIds: number[]): void {
|
||||
for (const id of messageIds) {
|
||||
this._pending.push([this._statements.delMessageRefs, [chatId, id]])
|
||||
}
|
||||
}
|
||||
|
||||
// IStateStorage implementation
|
||||
|
||||
getState(key: string, parse = true): unknown {
|
||||
|
|
|
@ -219,6 +219,51 @@ export function testStorage<T extends ITelegramStorage>(
|
|||
expect(await s.getFullPeerById(stubPeerUser.id)).toEqual(stubPeerUser.full)
|
||||
expect(await s.getFullPeerById(peerChannel.id)).toEqual(peerChannel.full)
|
||||
})
|
||||
|
||||
describe('min peers', () => {
|
||||
it('should generate *FromMessage constructors from reference messages', async () => {
|
||||
await s.updatePeers([peerChannel])
|
||||
await s.saveReferenceMessage(stubPeerUser.id, peerChannel.id, 456)
|
||||
await s.save?.() // update-related methods are batched, so we need to save
|
||||
|
||||
expect(await s.getPeerById(stubPeerUser.id)).toEqual({
|
||||
_: 'inputPeerUserFromMessage',
|
||||
peer: peerChannelInput,
|
||||
msgId: 456,
|
||||
userId: stubPeerUser.id,
|
||||
})
|
||||
})
|
||||
|
||||
it('should handle cases when referenced chat is not available', async () => {
|
||||
// this shouldn't really happen, but the storage should be able to handle it
|
||||
await s.saveReferenceMessage(stubPeerUser.id, peerChannel.id, 456)
|
||||
await s.save?.() // update-related methods are batched, so we need to save
|
||||
|
||||
expect(await s.getPeerById(stubPeerUser.id)).toEqual(null)
|
||||
})
|
||||
|
||||
it('should return full peer if it gets available', async () => {
|
||||
await s.updatePeers([peerChannel])
|
||||
await s.saveReferenceMessage(stubPeerUser.id, peerChannel.id, 456)
|
||||
await s.save?.() // update-related methods are batched, so we need to save
|
||||
|
||||
await s.updatePeers([stubPeerUser])
|
||||
await s.save?.()
|
||||
|
||||
expect(await s.getPeerById(stubPeerUser.id)).toEqual(peerUserInput)
|
||||
})
|
||||
|
||||
it('should handle cases when referenced message is deleted', async () => {
|
||||
await s.updatePeers([peerChannel])
|
||||
await s.saveReferenceMessage(stubPeerUser.id, peerChannel.id, 456)
|
||||
await s.save?.() // update-related methods are batched, so we need to save
|
||||
|
||||
await s.deleteReferenceMessages(peerChannel.id, [456])
|
||||
await s.save?.()
|
||||
|
||||
expect(await s.getPeerById(stubPeerUser.id)).toEqual(null)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('current user', () => {
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
TL schema and related utils used for mtcute.
|
||||
|
||||
Generated from TL layer **166** (last updated on 17.11.2023).
|
||||
Generated from TL layer **166** (last updated on 25.11.2023).
|
||||
|
||||
## About
|
||||
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -1,24 +1,7 @@
|
|||
// for internal use
|
||||
// some custom types for internal use
|
||||
|
||||
---types---
|
||||
|
||||
dummyUpdate pts:int pts_count:int channel_id:int53 = Update;
|
||||
|
||||
// reactions
|
||||
// taken from OLD official
|
||||
// no longer work on newer layers because they changed this thing entirely
|
||||
// kept only as a historical reference
|
||||
|
||||
// ---types---
|
||||
//
|
||||
// updateMessageReactions peer:Peer msg_id:int reactions:MessageReactions = Update;
|
||||
// messageReactions flags:# min:flags.0?true results:Vector<ReactionCount> = MessageReactions;
|
||||
// reactionCount flags:# chosen:flags.0?true reaction:string count:int = ReactionCount;
|
||||
//
|
||||
// messageReactionsList flags:# count:int reactions:Vector<MessageUserReaction> users:Vector<User> next_offset:flags.0?string = MessageReactionsList;
|
||||
// messageUserReaction user_id:int reaction:string = MessageUserReaction;
|
||||
//
|
||||
// ---functions---
|
||||
//
|
||||
// messages.sendReaction flags:# peer:InputPeer msg_id:int reaction:flags.0?string = Updates;
|
||||
// messages.getMessagesReactions peer:InputPeer id:Vector<int> = Updates;
|
||||
// messages.getMessageReactionsList flags:# peer:InputPeer id:int reaction:flags.0?string offset:flags.1?string limit:int = MessageReactionsList;
|
||||
mtcute.dummyUpdate pts:int pts_count:int channel_id:int53 = Update;
|
||||
mtcute.dummyInputPeerMinUser user_id:int = InputPeer;
|
||||
mtcute.dummyInputPeerMinChannel channel_id:int = InputPeer;
|
||||
|
|
|
@ -66,6 +66,7 @@
|
|||
"messageActionInviteToGroupCall": ["users"],
|
||||
"messageEntityMentionName": ["user_id"],
|
||||
"messageMediaContact": ["user_id"],
|
||||
"messageMediaGiveaway": ["channels"],
|
||||
"messageReplies": ["channel_id"],
|
||||
"messageReplyStoryHeader": ["user_id"],
|
||||
"messageUserVote": ["user_id"],
|
||||
|
@ -122,6 +123,7 @@
|
|||
"updateChatUserTyping": ["chat_id"],
|
||||
"updateDeleteChannelMessages": ["channel_id"],
|
||||
"updateGroupCall": ["chat_id"],
|
||||
"updateGroupInvitePrivacyForbidden": ["chat_id"],
|
||||
"updateInlineBotCallbackQuery": ["user_id"],
|
||||
"updatePinnedChannelMessages": ["channel_id"],
|
||||
"updateReadChannelDiscussionInbox": ["channel_id", "broadcast_id"],
|
||||
|
@ -130,6 +132,7 @@
|
|||
"updateReadChannelOutbox": ["channel_id"],
|
||||
"updateShortChatMessage": ["from_id", "chat_id", "via_bot_id"],
|
||||
"updateShortMessage": ["user_id", "via_bot_id"],
|
||||
"updateUser": ["user_id"],
|
||||
"updateUserEmojiStatus": ["user_id"],
|
||||
"updateUserName": ["user_id"],
|
||||
"updateUserPhone": ["user_id"],
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "@mtcute/tl",
|
||||
"version": "166.1.0",
|
||||
"version": "166.2.0",
|
||||
"description": "TL schema used for mtcute",
|
||||
"main": "index.js",
|
||||
"author": "Alina Sireneva <alina@tei.su>",
|
||||
|
|
Loading…
Reference in a new issue