feat(client): improved min peers and short updates handling
This commit is contained in:
parent
19d675e0fb
commit
c228085bfe
13 changed files with 343 additions and 379 deletions
|
@ -2985,6 +2985,7 @@ export class TelegramClient extends BaseTelegramClient {
|
|||
protected _oldPts: number
|
||||
protected _oldDate: number
|
||||
protected _oldSeq: number
|
||||
protected _selfChanged: boolean
|
||||
protected _cpts: Record<number, number>
|
||||
protected _cptsMod: Record<number, number>
|
||||
constructor(opts: BaseTelegramClient.Options) {
|
||||
|
@ -3005,6 +3006,8 @@ export class TelegramClient extends BaseTelegramClient {
|
|||
// modified channel pts, to avoid unnecessary
|
||||
// DB calls for not modified cpts
|
||||
this._cptsMod = {}
|
||||
|
||||
this._selfChanged = false
|
||||
}
|
||||
|
||||
acceptTos = acceptTos
|
||||
|
|
|
@ -39,6 +39,7 @@ export async function checkPassword(
|
|||
|
||||
this._userId = res.user.id
|
||||
this._isBot = false
|
||||
this._selfChanged = true
|
||||
await this._fetchUpdatesState()
|
||||
await this._saveStorage()
|
||||
|
||||
|
|
|
@ -19,7 +19,8 @@ export async function logOut(
|
|||
if (resetSession) {
|
||||
this._userId = null
|
||||
this._isBot = false
|
||||
this._pts /* = this._seq */ = this._date = undefined as any
|
||||
this._pts = this._seq = this._date = undefined as any
|
||||
this._selfChanged = true
|
||||
this.storage.reset()
|
||||
await this._saveStorage()
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ export async function recoverPassword(
|
|||
|
||||
this._userId = res.user.id
|
||||
this._isBot = false
|
||||
this._selfChanged = true
|
||||
await this._saveStorage()
|
||||
|
||||
return new User(this, res.user)
|
||||
|
|
|
@ -35,6 +35,7 @@ export async function signInBot(
|
|||
|
||||
this._userId = res.user.id
|
||||
this._isBot = true
|
||||
this._selfChanged = true
|
||||
await this._fetchUpdatesState()
|
||||
await this._saveStorage()
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ export async function signIn(
|
|||
|
||||
this._userId = res.user.id
|
||||
this._isBot = false
|
||||
this._selfChanged = true
|
||||
await this._fetchUpdatesState()
|
||||
await this._saveStorage()
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ export async function signUp(
|
|||
|
||||
this._userId = res.user.id
|
||||
this._isBot = false
|
||||
this._selfChanged = true
|
||||
await this._fetchUpdatesState()
|
||||
await this._saveStorage()
|
||||
|
||||
|
|
|
@ -3,12 +3,15 @@ import { TelegramClient } from '../client'
|
|||
import {
|
||||
createUsersChatsIndex,
|
||||
normalizeToInputChannel,
|
||||
normalizeToInputUser,
|
||||
peerToInputPeer,
|
||||
} from '../utils/peer-utils'
|
||||
import { extractChannelIdFromUpdate } from '../utils/misc-utils'
|
||||
import bigInt from 'big-integer'
|
||||
import { AsyncLock, MAX_CHANNEL_ID } from '@mtcute/core'
|
||||
import {
|
||||
AsyncLock,
|
||||
getBarePeerId,
|
||||
getMarkedPeerId,
|
||||
markedPeerIdToBare,
|
||||
MAX_CHANNEL_ID,
|
||||
} from '@mtcute/core'
|
||||
import { isDummyUpdate, isDummyUpdates } from '../utils/updates-utils'
|
||||
import { ChatsIndex, UsersIndex } from '../types'
|
||||
|
||||
|
@ -32,6 +35,7 @@ interface UpdatesState {
|
|||
_oldPts: number
|
||||
_oldDate: number
|
||||
_oldSeq: number
|
||||
_selfChanged: boolean
|
||||
|
||||
_cpts: Record<number, number>
|
||||
_cptsMod: Record<number, number>
|
||||
|
@ -49,6 +53,8 @@ function _initializeUpdates(this: TelegramClient) {
|
|||
// modified channel pts, to avoid unnecessary
|
||||
// DB calls for not modified cpts
|
||||
this._cptsMod = {}
|
||||
|
||||
this._selfChanged = false
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -118,11 +124,12 @@ export async function _saveStorage(this: TelegramClient): Promise<void> {
|
|||
await this.storage.setManyChannelPts(this._cptsMod)
|
||||
this._cptsMod = {}
|
||||
}
|
||||
if (this._userId !== null) {
|
||||
if (this._userId !== null && this._selfChanged) {
|
||||
await this.storage.setSelf({
|
||||
userId: this._userId,
|
||||
isBot: this._isBot,
|
||||
})
|
||||
this._selfChanged = false
|
||||
}
|
||||
|
||||
await this.storage.save?.()
|
||||
|
@ -213,6 +220,162 @@ function _createNoDispatchIndex(
|
|||
return ret
|
||||
}
|
||||
|
||||
async function _replaceMinPeers(
|
||||
this: TelegramClient,
|
||||
upd: tl.TypeUpdates
|
||||
): Promise<boolean> {
|
||||
switch (upd._) {
|
||||
case 'updates':
|
||||
case 'updatesCombined': {
|
||||
for (let i = 0; i < upd.users.length; i++) {
|
||||
if ((upd.users[i] as any).min) {
|
||||
const cached = await this.storage.getFullPeerById(
|
||||
upd.users[i].id
|
||||
)
|
||||
if (!cached) return false
|
||||
upd.users[i] = cached as tl.TypeUser
|
||||
}
|
||||
}
|
||||
|
||||
for (let i = 0; i < upd.chats.length; i++) {
|
||||
const c = upd.chats[i]
|
||||
if ((c as any).min) {
|
||||
let id: number
|
||||
switch (c._) {
|
||||
case 'channel':
|
||||
case 'channelForbidden':
|
||||
id = MAX_CHANNEL_ID - c.id
|
||||
break
|
||||
default:
|
||||
id = -c.id
|
||||
}
|
||||
|
||||
const cached = await this.storage.getFullPeerById(id)
|
||||
if (!cached) return false
|
||||
upd.chats[i] = cached as tl.TypeChat
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
async function _fetchPeersForShort(
|
||||
this: TelegramClient,
|
||||
upd: tl.TypeUpdate | tl.RawMessage | tl.RawMessageService
|
||||
): Promise<{
|
||||
users: UsersIndex
|
||||
chats: ChatsIndex
|
||||
} | null> {
|
||||
const users: UsersIndex = {}
|
||||
const chats: ChatsIndex = {}
|
||||
|
||||
const fetchPeer = async (peer?: tl.TypePeer | number) => {
|
||||
if (!peer) return true
|
||||
|
||||
const bare =
|
||||
typeof peer === 'number'
|
||||
? markedPeerIdToBare(peer)
|
||||
: getBarePeerId(peer)
|
||||
|
||||
const marked = typeof peer === 'number' ? peer : getMarkedPeerId(peer)
|
||||
|
||||
const cached = await this.storage.getFullPeerById(marked)
|
||||
if (!cached) return false
|
||||
if (marked > 0) {
|
||||
users[bare] = cached as tl.TypeUser
|
||||
} else {
|
||||
chats[bare] = cached as tl.TypeChat
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
switch (upd._) {
|
||||
// not really sure if these can be inside updateShort, but whatever
|
||||
case 'message':
|
||||
case 'messageService':
|
||||
case 'updateNewMessage':
|
||||
case 'updateNewChannelMessage':
|
||||
case 'updateEditMessage':
|
||||
case 'updateEditChannelMessage': {
|
||||
const msg = upd._ === 'message' || upd._ === 'messageService' ? upd : upd.message
|
||||
if (msg._ === 'messageEmpty') return null
|
||||
|
||||
// ref: https://github.com/tdlib/td/blob/e1ebf743988edfcf4400cd5d33a664ff941dc13e/td/telegram/UpdatesManager.cpp#L412
|
||||
if (!(await fetchPeer(msg.peerId))) return null
|
||||
if (!(await fetchPeer(msg.fromId))) return null
|
||||
if (msg.replyTo && !(await fetchPeer(msg.replyTo.replyToPeerId)))
|
||||
return null
|
||||
if (msg._ !== 'messageService') {
|
||||
if (
|
||||
msg.fwdFrom &&
|
||||
(!(await fetchPeer(msg.fwdFrom.fromId)) ||
|
||||
!(await fetchPeer(msg.fwdFrom.savedFromPeer)))
|
||||
)
|
||||
return null
|
||||
if (!(await fetchPeer(msg.viaBotId))) return null
|
||||
|
||||
if (msg.entities) {
|
||||
for (const ent of msg.entities) {
|
||||
if (ent._ === 'messageEntityMentionName') {
|
||||
if (!(await fetchPeer(ent.userId))) return null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (msg.media) {
|
||||
switch (msg.media._) {
|
||||
case 'messageMediaContact':
|
||||
if (
|
||||
msg.media.userId &&
|
||||
!(await fetchPeer(msg.media.userId))
|
||||
)
|
||||
return null
|
||||
}
|
||||
}
|
||||
} else {
|
||||
switch (msg.action._) {
|
||||
case 'messageActionChatCreate':
|
||||
case 'messageActionChatAddUser':
|
||||
case 'messageActionInviteToGroupCall':
|
||||
for (const user of msg.action.users) {
|
||||
if (!(await fetchPeer(user))) return null
|
||||
}
|
||||
break
|
||||
case 'messageActionChatJoinedByLink':
|
||||
if (!(await fetchPeer(msg.action.inviterId))) return null
|
||||
break
|
||||
case 'messageActionChatDeleteUser':
|
||||
if (!(await fetchPeer(msg.action.userId))) return null
|
||||
break
|
||||
case 'messageActionChatMigrateTo':
|
||||
if (!(await fetchPeer(MAX_CHANNEL_ID - msg.action.channelId))) return null
|
||||
break
|
||||
case 'messageActionChannelMigrateFrom':
|
||||
if (!(await fetchPeer(-msg.action.chatId))) return null
|
||||
break
|
||||
case 'messageActionGeoProximityReached':
|
||||
if (!(await fetchPeer(msg.action.fromId))) return null
|
||||
if (!(await fetchPeer(msg.action.toId))) return null
|
||||
break
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
case 'updateDraftMessage':
|
||||
if ('entities' in upd.draft && upd.draft.entities) {
|
||||
for (const ent of upd.draft.entities) {
|
||||
if (ent._ === 'messageEntityMentionName') {
|
||||
if (!(await fetchPeer(ent.userId))) return null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { users, chats }
|
||||
}
|
||||
|
||||
async function _loadDifference(
|
||||
this: TelegramClient,
|
||||
noDispatch?: NoDispatchIndex
|
||||
|
@ -461,7 +624,15 @@ export function _handleUpdate(
|
|||
return await _loadDifference.call(this)
|
||||
}
|
||||
|
||||
await this._cachePeersFrom(update)
|
||||
const hasMin = await this._cachePeersFrom(update)
|
||||
if (hasMin) {
|
||||
if (!(await _replaceMinPeers.call(this, update))) {
|
||||
// some min peer is not cached.
|
||||
// need to re-fetch the thing, and cache them on the way
|
||||
return await _loadDifference.call(this)
|
||||
}
|
||||
}
|
||||
|
||||
const { users, chats } = createUsersChatsIndex(update)
|
||||
|
||||
for (const upd of update.updates) {
|
||||
|
@ -535,7 +706,7 @@ export function _handleUpdate(
|
|||
}
|
||||
|
||||
if (!isDummyUpdates(update)) {
|
||||
this._seq = update.seq
|
||||
if (update.seq !== 0) this._seq = update.seq
|
||||
this._date = update.date
|
||||
}
|
||||
break
|
||||
|
@ -547,8 +718,17 @@ export function _handleUpdate(
|
|||
upd.dcOptions
|
||||
} else if (upd._ === 'updateConfig') {
|
||||
this._config = await this.call({ _: 'help.getConfig' })
|
||||
} else if (!noDispatch) {
|
||||
this.dispatchUpdate(upd, {}, {})
|
||||
} else {
|
||||
if (!noDispatch) {
|
||||
const peers = await _fetchPeersForShort.call(this, upd)
|
||||
if (!peers) {
|
||||
// some peer is not cached.
|
||||
// need to re-fetch the thing, and cache them on the way
|
||||
return await _loadDifference.call(this)
|
||||
}
|
||||
|
||||
this.dispatchUpdate(upd, peers.users, peers.chats)
|
||||
}
|
||||
}
|
||||
|
||||
this._date = update.date
|
||||
|
@ -557,6 +737,14 @@ export function _handleUpdate(
|
|||
case 'updateShortMessage': {
|
||||
if (noDispatch) return
|
||||
|
||||
const nextLocalPts = this._pts + update.ptsCount
|
||||
if (nextLocalPts > update.pts)
|
||||
// "the update was already applied, and must be ignored"
|
||||
return
|
||||
if (nextLocalPts < update.pts)
|
||||
// "there's an update gap that must be filled"
|
||||
return await _loadDifference.call(this)
|
||||
|
||||
const message: tl.RawMessage = {
|
||||
_: 'message',
|
||||
out: update.out,
|
||||
|
@ -581,102 +769,30 @@ export function _handleUpdate(
|
|||
ttlPeriod: update.ttlPeriod,
|
||||
}
|
||||
|
||||
// now we need to fetch info about users involved.
|
||||
// since this update is only used for PM, we can just
|
||||
// fetch the current user and the other user.
|
||||
// additionally, we need to handle "forwarded from"
|
||||
// field, as it may contain a user OR a channel
|
||||
const fwdFrom = update.fwdFrom?.fromId
|
||||
? peerToInputPeer(update.fwdFrom.fromId)
|
||||
: undefined
|
||||
|
||||
let rawUsers: tl.TypeUser[]
|
||||
{
|
||||
const id: tl.TypeInputUser[] = [
|
||||
{ _: 'inputUserSelf' },
|
||||
{
|
||||
_: 'inputUser',
|
||||
userId: update.userId,
|
||||
accessHash: bigInt.zero,
|
||||
},
|
||||
]
|
||||
|
||||
if (fwdFrom) {
|
||||
const inputUser = normalizeToInputUser(fwdFrom)
|
||||
if (inputUser) id.push(inputUser)
|
||||
}
|
||||
|
||||
rawUsers = await this.call({
|
||||
_: 'users.getUsers',
|
||||
id,
|
||||
})
|
||||
|
||||
if (rawUsers.length !== id.length) {
|
||||
// other user failed to load.
|
||||
// first try checking for input peer in storage
|
||||
const saved = await this.storage.getPeerById(
|
||||
update.userId
|
||||
)
|
||||
if (saved) {
|
||||
id[1] = normalizeToInputUser(saved)!
|
||||
|
||||
rawUsers = await this.call({
|
||||
_: 'users.getUsers',
|
||||
id,
|
||||
})
|
||||
}
|
||||
}
|
||||
if (rawUsers.length !== id.length) {
|
||||
// not saved (or invalid hash), not found by id
|
||||
// find that user in dialogs (since the update
|
||||
// is about an incoming message, dialog with that
|
||||
// user should be one of the first)
|
||||
const dialogs = await this.call({
|
||||
_: 'messages.getDialogs',
|
||||
offsetDate: 0,
|
||||
offsetId: 0,
|
||||
offsetPeer: { _: 'inputPeerEmpty' },
|
||||
limit: 20,
|
||||
hash: 0,
|
||||
})
|
||||
if (dialogs._ === 'messages.dialogsNotModified')
|
||||
return
|
||||
|
||||
const user = dialogs.users.find(
|
||||
(it) => it.id === update.userId
|
||||
)
|
||||
if (!user) {
|
||||
debug(
|
||||
"received updateShortMessage, but wasn't able to find User"
|
||||
)
|
||||
return
|
||||
}
|
||||
rawUsers.push(user)
|
||||
}
|
||||
}
|
||||
let rawChats: tl.TypeChat[] = []
|
||||
if (fwdFrom) {
|
||||
const inputChannel = normalizeToInputChannel(fwdFrom)
|
||||
if (inputChannel)
|
||||
rawChats = await this.call({
|
||||
_: 'channels.getChannels',
|
||||
id: [inputChannel],
|
||||
}).then((res) => res.chats)
|
||||
const peers = await _fetchPeersForShort.call(this, message)
|
||||
if (!peers) {
|
||||
// some peer is not cached.
|
||||
// need to re-fetch the thing, and cache them on the way
|
||||
return await _loadDifference.call(this)
|
||||
}
|
||||
|
||||
this._date = update.date
|
||||
this._pts = update.pts
|
||||
|
||||
const { users, chats } = createUsersChatsIndex({
|
||||
users: rawUsers,
|
||||
chats: rawChats,
|
||||
})
|
||||
this.dispatchUpdate(message, users, chats)
|
||||
this.dispatchUpdate(message, peers.users, peers.chats)
|
||||
break
|
||||
}
|
||||
case 'updateShortChatMessage': {
|
||||
if (noDispatch) return
|
||||
|
||||
const nextLocalPts = this._pts + update.ptsCount
|
||||
if (nextLocalPts > update.pts)
|
||||
// "the update was already applied, and must be ignored"
|
||||
return
|
||||
if (nextLocalPts < update.pts)
|
||||
// "there's an update gap that must be filled"
|
||||
return await _loadDifference.call(this)
|
||||
|
||||
const message: tl.RawMessage = {
|
||||
_: 'message',
|
||||
out: update.out,
|
||||
|
@ -701,98 +817,35 @@ export function _handleUpdate(
|
|||
ttlPeriod: update.ttlPeriod,
|
||||
}
|
||||
|
||||
// similarly to updateShortMessage, we need to fetch the sender
|
||||
// user and the chat, and also handle "forwarded from" info.
|
||||
const fwdFrom = update.fwdFrom?.fromId
|
||||
? peerToInputPeer(update.fwdFrom.fromId)
|
||||
: undefined
|
||||
|
||||
let rawUsers: tl.TypeUser[]
|
||||
{
|
||||
const id: tl.TypeInputUser[] = [
|
||||
{
|
||||
_: 'inputUser',
|
||||
userId: update.fromId,
|
||||
accessHash: bigInt.zero,
|
||||
},
|
||||
]
|
||||
|
||||
if (fwdFrom) {
|
||||
const inputUser = normalizeToInputUser(fwdFrom)
|
||||
if (inputUser) id.push(inputUser)
|
||||
}
|
||||
|
||||
rawUsers = await this.call({
|
||||
_: 'users.getUsers',
|
||||
id,
|
||||
})
|
||||
|
||||
if (rawUsers.length !== id.length) {
|
||||
// user failed to load.
|
||||
// first try checking for input peer in storage
|
||||
const saved = await this.storage.getPeerById(
|
||||
update.fromId
|
||||
)
|
||||
if (saved) {
|
||||
id[0] = normalizeToInputUser(saved)!
|
||||
|
||||
rawUsers = await this.call({
|
||||
_: 'users.getUsers',
|
||||
id,
|
||||
})
|
||||
}
|
||||
}
|
||||
if (rawUsers.length !== id.length) {
|
||||
// not saved (or invalid hash), not found by id
|
||||
// find that user in chat participants list
|
||||
const res = await this.call({
|
||||
_: 'messages.getFullChat',
|
||||
chatId: update.chatId,
|
||||
})
|
||||
|
||||
const user = res.users.find(
|
||||
(it) => it.id === update.fromId
|
||||
)
|
||||
if (!user) {
|
||||
debug(
|
||||
"received updateShortChatMessage, but wasn't able to find User"
|
||||
)
|
||||
return
|
||||
}
|
||||
rawUsers.push(user)
|
||||
}
|
||||
}
|
||||
const rawChats = await this.call({
|
||||
_: 'messages.getChats',
|
||||
id: [update.chatId],
|
||||
}).then((res) => res.chats)
|
||||
|
||||
if (fwdFrom) {
|
||||
const inputChannel = normalizeToInputChannel(fwdFrom)
|
||||
if (inputChannel) {
|
||||
const res = await this.call({
|
||||
_: 'channels.getChannels',
|
||||
id: [inputChannel],
|
||||
})
|
||||
rawChats.push(...res.chats)
|
||||
}
|
||||
const peers = await _fetchPeersForShort.call(this, message)
|
||||
if (!peers) {
|
||||
// some peer is not cached.
|
||||
// need to re-fetch the thing, and cache them on the way
|
||||
return await _loadDifference.call(this)
|
||||
}
|
||||
|
||||
this._date = update.date
|
||||
this._pts = update.pts
|
||||
|
||||
const { users, chats } = createUsersChatsIndex({
|
||||
users: rawUsers,
|
||||
chats: rawChats,
|
||||
})
|
||||
this.dispatchUpdate(message, users, chats)
|
||||
this.dispatchUpdate(message, peers.users, peers.chats)
|
||||
break
|
||||
}
|
||||
case 'updateShortSentMessage': // only store the new pts and date values
|
||||
case 'updateShortSentMessage': {
|
||||
// only store the new pts and date values
|
||||
// we never need to dispatch this
|
||||
|
||||
const nextLocalPts = this._pts + update.ptsCount
|
||||
if (nextLocalPts > update.pts)
|
||||
// "the update was already applied, and must be ignored"
|
||||
return
|
||||
if (nextLocalPts < update.pts)
|
||||
// "there's an update gap that must be filled"
|
||||
return await _loadDifference.call(this)
|
||||
|
||||
this._date = update.date
|
||||
this._pts = update.pts
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
.catch((err) => this._emitError(err))
|
||||
|
|
|
@ -643,13 +643,19 @@ export class BaseTelegramClient {
|
|||
|
||||
/**
|
||||
* Adds all peers from a given object to entity cache in storage.
|
||||
*
|
||||
* @returns `true` if there were any `min` peers
|
||||
*/
|
||||
protected async _cachePeersFrom(obj: any): Promise<void> {
|
||||
protected async _cachePeersFrom(obj: any): Promise<boolean> {
|
||||
const parsedPeers: ITelegramStorage.PeerInfo[] = []
|
||||
|
||||
let hadMin = false
|
||||
|
||||
for (const peer of getAllPeersFrom(obj)) {
|
||||
if ((peer as any).min && !peer.fromMessage && !(peer as any).bot) {
|
||||
debug('peer is min, but no context was found: %o %o', obj, peer)
|
||||
if ((peer as any).min) {
|
||||
// absolutely incredible min peer handling, courtesy of levlam.
|
||||
// see this thread: https://t.me/tdlibchat/15084
|
||||
hadMin = true
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -658,11 +664,10 @@ export class BaseTelegramClient {
|
|||
parsedPeers.push({
|
||||
id: peer.id,
|
||||
accessHash: peer.accessHash!,
|
||||
username: peer.username?.toLowerCase() ?? null,
|
||||
phone: peer.phone ?? null,
|
||||
type: peer.bot ? 'bot' : 'user',
|
||||
updated: 0,
|
||||
fromMessage: peer.fromMessage,
|
||||
username: peer.username?.toLowerCase(),
|
||||
phone: peer.phone,
|
||||
type: 'user',
|
||||
full: peer,
|
||||
})
|
||||
break
|
||||
case 'chat':
|
||||
|
@ -670,11 +675,8 @@ export class BaseTelegramClient {
|
|||
parsedPeers.push({
|
||||
id: -peer.id,
|
||||
accessHash: bigInt.zero,
|
||||
username: null,
|
||||
phone: null,
|
||||
type: 'group',
|
||||
updated: 0,
|
||||
fromMessage: peer.fromMessage,
|
||||
type: 'chat',
|
||||
full: peer,
|
||||
})
|
||||
break
|
||||
case 'channel':
|
||||
|
@ -684,12 +686,10 @@ export class BaseTelegramClient {
|
|||
accessHash: peer.accessHash!,
|
||||
username:
|
||||
peer._ === 'channel'
|
||||
? peer.username?.toLowerCase() ?? null
|
||||
: null,
|
||||
phone: null,
|
||||
type: peer.broadcast ? 'channel' : 'supergroup',
|
||||
updated: 0,
|
||||
fromMessage: peer.fromMessage,
|
||||
? peer.username?.toLowerCase()
|
||||
: undefined,
|
||||
type: 'channel',
|
||||
full: peer,
|
||||
})
|
||||
break
|
||||
}
|
||||
|
@ -697,5 +697,7 @@ export class BaseTelegramClient {
|
|||
|
||||
await this.storage.updatePeers(parsedPeers)
|
||||
await this._saveStorage()
|
||||
|
||||
return hadMin
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,18 +1,16 @@
|
|||
import { MaybeAsync, PeerType } from '../types'
|
||||
import { BasicPeerType, MaybeAsync } from '../types'
|
||||
import { tl } from '@mtcute/tl'
|
||||
import { MAX_CHANNEL_ID } from '../utils/peer-utils'
|
||||
|
||||
export namespace ITelegramStorage {
|
||||
export interface PeerInfo {
|
||||
// marked id
|
||||
id: number
|
||||
accessHash: tl.Long
|
||||
type: PeerType
|
||||
username: string | null
|
||||
phone: string | null
|
||||
updated: number
|
||||
// marked peer id of chat, message id
|
||||
fromMessage?: [number, number]
|
||||
type: BasicPeerType
|
||||
username?: string
|
||||
phone?: string
|
||||
|
||||
full: tl.TypeUser | tl.TypeChat
|
||||
}
|
||||
|
||||
export interface SelfInfo {
|
||||
|
@ -132,6 +130,18 @@ export interface ITelegramStorage {
|
|||
*/
|
||||
setManyChannelPts(values: Record<number, number>): MaybeAsync<void>
|
||||
|
||||
/**
|
||||
* Get cached peer information by their marked ID.
|
||||
* Return `null` if caching is not supported, or the entity
|
||||
* is not cached (yet).
|
||||
*
|
||||
* This is primarily used when a `min` entity is encountered
|
||||
* in an update, or when a *short* update is encountered.
|
||||
* Returning `null` will require re-fetching that
|
||||
* update with the peers added, which might not be very efficient.
|
||||
*/
|
||||
getFullPeerById(id: number): MaybeAsync<tl.TypeUser | tl.TypeChat | null>
|
||||
|
||||
// TODO!
|
||||
// exportToString(): MaybeAsync<string>
|
||||
// importFromString(): MaybeAsync<void>
|
||||
|
|
|
@ -2,9 +2,12 @@ import { ITelegramStorage } from './abstract'
|
|||
import { MaybeAsync } from '../types'
|
||||
import { tl } from '@mtcute/tl'
|
||||
import { MAX_CHANNEL_ID } from '../utils/peer-utils'
|
||||
import { LruMap } from '../utils/lru-map'
|
||||
|
||||
const CURRENT_VERSION = 1
|
||||
|
||||
type PeerInfoWithUpdated = ITelegramStorage.PeerInfo & { updated: number }
|
||||
|
||||
interface MemorySessionState {
|
||||
// forwards compatibility for persistent storages
|
||||
$version: typeof CURRENT_VERSION
|
||||
|
@ -13,7 +16,7 @@ interface MemorySessionState {
|
|||
authKeys: Record<number, Buffer | null>
|
||||
|
||||
// marked peer id -> entity info
|
||||
entities: Record<number, ITelegramStorage.PeerInfo>
|
||||
entities: Record<number, PeerInfoWithUpdated>
|
||||
// phone number -> peer id
|
||||
phoneIndex: Record<string, number>
|
||||
// username -> peer id
|
||||
|
@ -33,8 +36,23 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
protected _state: MemorySessionState
|
||||
private _cachedInputPeers: Record<number, tl.TypeInputPeer> = {}
|
||||
|
||||
constructor() {
|
||||
private _cachedFull: LruMap<number, tl.TypeUser | tl.TypeChat>
|
||||
|
||||
constructor(params?: {
|
||||
/**
|
||||
* Maximum number of cached full entities.
|
||||
*
|
||||
* Note that full entities are **NOT** persisted
|
||||
* to the disk (in case this storage is backed
|
||||
* by a local storage), and only available within
|
||||
* the current runtime.
|
||||
*
|
||||
* Defaults to `100`, use `0` to disable
|
||||
*/
|
||||
cacheSize?: number
|
||||
}) {
|
||||
this.reset()
|
||||
this._cachedFull = new LruMap(params?.cacheSize ?? 100)
|
||||
}
|
||||
|
||||
reset(): void {
|
||||
|
@ -97,13 +115,15 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
return this._state.authKeys[dcId] ?? null
|
||||
}
|
||||
|
||||
updatePeers(peers: ITelegramStorage.PeerInfo[]): MaybeAsync<void> {
|
||||
updatePeers(peers: PeerInfoWithUpdated[]): MaybeAsync<void> {
|
||||
for (const peer of peers) {
|
||||
this._cachedFull.set(peer.id, peer.full)
|
||||
|
||||
peer.updated = Date.now()
|
||||
const old = this._state.entities[peer.id]
|
||||
if (old) {
|
||||
// min peer
|
||||
if (peer.fromMessage) continue
|
||||
// if (peer.fromMessage) continue
|
||||
|
||||
// delete old index entries if needed
|
||||
if (old.username && old.username !== peer.username) {
|
||||
|
@ -121,45 +141,28 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
}
|
||||
}
|
||||
|
||||
protected _getInputPeer(peerInfo?: ITelegramStorage.PeerInfo): tl.TypeInputPeer | null {
|
||||
protected _getInputPeer(
|
||||
peerInfo?: ITelegramStorage.PeerInfo
|
||||
): tl.TypeInputPeer | null {
|
||||
if (!peerInfo) return null
|
||||
if (peerInfo.type === 'user' || peerInfo.type === 'bot') {
|
||||
if (peerInfo.fromMessage) {
|
||||
switch (peerInfo.type) {
|
||||
case 'user':
|
||||
return {
|
||||
_: 'inputPeerUserFromMessage',
|
||||
peer: this.getPeerById(peerInfo.fromMessage[0])!,
|
||||
msgId: peerInfo.fromMessage[1],
|
||||
userId: peerInfo.id
|
||||
_: 'inputPeerUser',
|
||||
userId: peerInfo.id,
|
||||
accessHash: peerInfo.accessHash,
|
||||
}
|
||||
}
|
||||
return {
|
||||
_: 'inputPeerUser',
|
||||
userId: peerInfo.id,
|
||||
accessHash: peerInfo.accessHash,
|
||||
}
|
||||
}
|
||||
|
||||
if (peerInfo.type === 'group')
|
||||
return {
|
||||
_: 'inputPeerChat',
|
||||
chatId: -peerInfo.id,
|
||||
}
|
||||
|
||||
if (peerInfo.type === 'channel' || peerInfo.type === 'supergroup') {
|
||||
if (peerInfo.fromMessage) {
|
||||
case 'chat':
|
||||
return {
|
||||
_: 'inputPeerChannelFromMessage',
|
||||
peer: this.getPeerById(peerInfo.fromMessage[0])!,
|
||||
msgId: peerInfo.fromMessage[1],
|
||||
channelId: peerInfo.id
|
||||
_: 'inputPeerChat',
|
||||
chatId: -peerInfo.id,
|
||||
}
|
||||
case 'channel':
|
||||
return {
|
||||
_: 'inputPeerChannel',
|
||||
channelId: MAX_CHANNEL_ID - peerInfo.id,
|
||||
accessHash: peerInfo.accessHash,
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
_: 'inputPeerChannel',
|
||||
channelId: MAX_CHANNEL_ID - peerInfo.id,
|
||||
accessHash: peerInfo.accessHash,
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(`Invalid peer type: ${peerInfo.type}`)
|
||||
|
@ -208,11 +211,26 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
return this._state.pts[entityId] ?? null
|
||||
}
|
||||
|
||||
setCommonPts(val: [number, number, number]): void {
|
||||
this._state.gpts = val
|
||||
}
|
||||
|
||||
getCommonPts(): [number, number, number] | null {
|
||||
getUpdatesState(): MaybeAsync<[number, number, number] | null> {
|
||||
return this._state.gpts ?? null
|
||||
}
|
||||
|
||||
setUpdatesPts(val: number): MaybeAsync<void> {
|
||||
if (!this._state.gpts) this._state.gpts = [0, 0, 0]
|
||||
this._state.gpts[0] = val
|
||||
}
|
||||
|
||||
setUpdatesDate(val: number): MaybeAsync<void> {
|
||||
if (!this._state.gpts) this._state.gpts = [0, 0, 0]
|
||||
this._state.gpts[1] = val
|
||||
}
|
||||
|
||||
setUpdatesSeq(val: number): MaybeAsync<void> {
|
||||
if (!this._state.gpts) this._state.gpts = [0, 0, 0]
|
||||
this._state.gpts[2] = val
|
||||
}
|
||||
|
||||
getFullPeerById(id: number): tl.TypeUser | tl.TypeChat | null {
|
||||
return this._cachedFull.get(id) ?? null
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,9 +2,3 @@
|
|||
* Peer types that have one-to-one relation to `tl.Peer*` types.
|
||||
*/
|
||||
export type BasicPeerType = 'user' | 'chat' | 'channel'
|
||||
|
||||
/**
|
||||
* More extensive peer types, that differentiate between
|
||||
* users and bots, channels and supergroups.
|
||||
*/
|
||||
export type PeerType = 'user' | 'bot' | 'group' | 'channel' | 'supergroup'
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import { tl } from '@mtcute/tl'
|
||||
import { BasicPeerType, PeerType } from '../types'
|
||||
import { BasicPeerType } from '../types'
|
||||
|
||||
export const MIN_CHANNEL_ID = -1002147483647
|
||||
export const MAX_CHANNEL_ID = -1000000000000
|
||||
|
@ -30,23 +30,20 @@ export function getBarePeerId(peer: tl.TypePeer): number {
|
|||
*/
|
||||
export function getMarkedPeerId(
|
||||
peerId: number,
|
||||
peerType: BasicPeerType | PeerType
|
||||
peerType: BasicPeerType
|
||||
): number
|
||||
export function getMarkedPeerId(peer: tl.TypePeer | tl.TypeInputPeer): number
|
||||
export function getMarkedPeerId(
|
||||
peer: tl.TypePeer | tl.TypeInputPeer | number,
|
||||
peerType?: BasicPeerType | PeerType
|
||||
peerType?: BasicPeerType
|
||||
): number {
|
||||
if (typeof peer === 'number') {
|
||||
switch (peerType) {
|
||||
case 'user':
|
||||
case 'bot':
|
||||
return peer
|
||||
case 'chat':
|
||||
case 'group':
|
||||
return -peer
|
||||
case 'channel':
|
||||
case 'supergroup':
|
||||
return MAX_CHANNEL_ID - peer
|
||||
}
|
||||
throw new Error('Invalid peer type')
|
||||
|
@ -106,114 +103,6 @@ export function markedPeerIdToBare(peerId: number): number {
|
|||
throw new Error('Invalid marked peer id')
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert {@link PeerType} to {@link BasicPeerType}
|
||||
*/
|
||||
export function peerTypeToBasic(type: PeerType): BasicPeerType {
|
||||
switch (type) {
|
||||
case 'bot':
|
||||
case 'user':
|
||||
return 'user'
|
||||
case 'group':
|
||||
return 'chat'
|
||||
case 'channel':
|
||||
case 'supergroup':
|
||||
return 'channel'
|
||||
}
|
||||
}
|
||||
|
||||
function comparePeers(
|
||||
first: tl.TypePeer | undefined,
|
||||
second: tl.TypePeer | tl.TypeUser | tl.TypeChat
|
||||
): boolean {
|
||||
if (!first) return false
|
||||
|
||||
if ('userId' in first) {
|
||||
if ('userId' in second) return first.userId === second.userId
|
||||
switch (second._) {
|
||||
case 'user':
|
||||
case 'userEmpty':
|
||||
return first.userId === second.id
|
||||
}
|
||||
}
|
||||
if ('chatId' in first) {
|
||||
if ('chatId' in second) return first.chatId === second.chatId
|
||||
switch (second._) {
|
||||
case 'chat':
|
||||
case 'chatForbidden':
|
||||
case 'chatEmpty':
|
||||
return first.chatId === second.id
|
||||
}
|
||||
}
|
||||
if ('channelId' in first) {
|
||||
if ('channelId' in second) return first.channelId === second.channelId
|
||||
switch (second._) {
|
||||
case 'channel':
|
||||
case 'channelForbidden':
|
||||
return first.channelId === second.id
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
function isRefMessage(msg: tl.TypeMessage, peer: any): boolean | undefined {
|
||||
return (
|
||||
comparePeers(msg.peerId, peer) ||
|
||||
('fromId' in msg && comparePeers(msg.fromId, peer)) ||
|
||||
('fwdFrom' in msg &&
|
||||
msg.fwdFrom &&
|
||||
comparePeers(msg.fwdFrom.fromId, peer)) ||
|
||||
('replies' in msg &&
|
||||
msg.replies &&
|
||||
msg.replies.recentRepliers &&
|
||||
msg.replies.recentRepliers.some((it) => comparePeers(it, peer)))
|
||||
)
|
||||
}
|
||||
|
||||
function findContext(obj: any, peer: any): [number, number] | undefined {
|
||||
if (!peer.min) return undefined
|
||||
switch (obj._) {
|
||||
case 'updates':
|
||||
case 'updatesCombined':
|
||||
case 'updates.difference':
|
||||
case 'updates.differenceSlice':
|
||||
for (const upd of (obj.updates ||
|
||||
obj.otherUpdates) as tl.TypeUpdate[]) {
|
||||
switch (upd._) {
|
||||
case 'updateNewMessage':
|
||||
case 'updateNewChannelMessage':
|
||||
case 'updateEditMessage':
|
||||
case 'updateEditChannelMessage':
|
||||
if (isRefMessage(upd.message, peer)) {
|
||||
return [
|
||||
getMarkedPeerId(upd.message.peerId!),
|
||||
upd.message.id,
|
||||
]
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
break
|
||||
case 'updateShortMessage':
|
||||
return [obj.userId, obj.id]
|
||||
case 'updateShortChatMessage':
|
||||
return [-obj.chatId, obj.id]
|
||||
}
|
||||
|
||||
if ('messages' in obj || 'newMessages' in obj) {
|
||||
for (const msg of (obj.messages ||
|
||||
obj.newMessages) as tl.TypeMessage[]) {
|
||||
if (isRefMessage(msg, peer)) {
|
||||
return [getMarkedPeerId(msg.peerId!), msg.id]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// im not sure if this is exhaustive check or not
|
||||
|
||||
return undefined
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts all (cacheable) entities from a TlObject or a list of them.
|
||||
* Only checks `.user`, `.chat`, `.channel`, `.users` and `.chats` properties
|
||||
|
@ -221,7 +110,7 @@ function findContext(obj: any, peer: any): [number, number] | undefined {
|
|||
export function* getAllPeersFrom(
|
||||
obj: any
|
||||
): Iterable<
|
||||
(tl.TypeUser | tl.TypeChat) & { fromMessage: [number, number] | undefined }
|
||||
(tl.TypeUser | tl.TypeChat)
|
||||
> {
|
||||
if (typeof obj !== 'object') return
|
||||
|
||||
|
@ -272,13 +161,6 @@ export function* getAllPeersFrom(
|
|||
for (const user of obj.users) {
|
||||
// .users is sometimes number[]
|
||||
if (typeof user === 'object' && user._ === 'user') {
|
||||
if (user.min && !user.bot) {
|
||||
// min seems to be set for @Channel_Bot,
|
||||
// but we don't really need to cache its context
|
||||
// (we don't need to cache it at all, really, but whatever)
|
||||
user.fromMessage = findContext(obj, user)
|
||||
}
|
||||
|
||||
yield user
|
||||
}
|
||||
}
|
||||
|
@ -293,10 +175,6 @@ export function* getAllPeersFrom(
|
|||
case 'channel':
|
||||
case 'chatForbidden':
|
||||
case 'channelForbidden':
|
||||
if (chat.min) {
|
||||
chat.fromMessage = findContext(obj, chat)
|
||||
}
|
||||
|
||||
yield chat
|
||||
break
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue