fix(client): better updates handling, also emit updates returned by RPC methods

This commit is contained in:
teidesu 2021-04-24 16:23:30 +03:00
parent 58f4356d58
commit c863e7a854
22 changed files with 265 additions and 39 deletions

View file

@ -1545,6 +1545,9 @@ export interface TelegramClient extends BaseTelegramClient {
* Also note that entity maps may contain entities that are not
* used in this particular update, so do not rely on its contents.
*
* `update` might contain a Message object - in this case,
* it should be interpreted as some kind of `updateNewMessage`.
*
* @param update Update that has just happened
* @param users Map of users in this update
* @param chats Map of chats in this update

View file

@ -34,15 +34,16 @@ export async function addChatMembers(
const p = normalizeToInputUser(await this.resolvePeer(user))
if (!p) continue
await this.call({
const updates = await this.call({
_: 'messages.addChatUser',
chatId: input.chatId,
userId: p,
fwdLimit: forwardCount,
})
this._handleUpdate(updates)
}
} else if (input._ === 'inputPeerChannel') {
await this.call({
const updates = await this.call({
_: 'channels.inviteToChannel',
channel: normalizeToInputChannel(chat)!,
users: await Promise.all(
@ -52,5 +53,6 @@ export async function addChatMembers(
).then((res) => res.filter(Boolean)) as tl.TypeInputUser[],
fwdLimit: forwardCount,
})
this._handleUpdate(updates)
} else throw new MtCuteInvalidPeerTypeError(chatId, 'chat or channel')
}

View file

@ -26,8 +26,9 @@ export async function archiveChats(
})
}
await this.call({
const updates = await this.call({
_: 'folders.editPeerFolders',
folderPeers
})
this._handleUpdate(updates)
}

View file

@ -29,5 +29,7 @@ export async function createChannel(
)
}
this._handleUpdate(res)
return new Chat(this, res.chats[0])
}

View file

@ -42,5 +42,7 @@ export async function createGroup(
)
}
this._handleUpdate(res)
return new Chat(this, res.chats[0])
}

View file

@ -28,5 +28,7 @@ export async function createSupergroup(
)
}
this._handleUpdate(res)
return new Chat(this, res.chats[0])
}

View file

@ -13,8 +13,9 @@ export async function deleteChannel(this: TelegramClient, chatId: InputPeerLike)
const peer = normalizeToInputChannel(await this.resolvePeer(chatId))
if (!peer) throw new MtCuteInvalidPeerTypeError(chatId, 'channel')
await this.call({
const res = await this.call({
_: 'channels.deleteChannel',
channel: peer
})
this._handleUpdate(res)
}

View file

@ -21,17 +21,20 @@ export async function deleteChatPhoto(
if (!(chat._ === 'inputPeerChat' || chat._ === 'inputPeerChannel'))
throw new MtCuteInvalidPeerTypeError(chatId, 'chat or channel')
let res
if (chat._ === 'inputPeerChat') {
await this.call({
res = await this.call({
_: 'messages.editChatPhoto',
chatId: chat.chatId,
photo: { _: 'inputChatPhotoEmpty' }
})
} else {
await this.call({
res = await this.call({
_: 'channels.editPhoto',
channel: normalizeToInputChannel(chat)!,
photo: { _: 'inputChatPhotoEmpty' }
})
}
this._handleUpdate(res)
}

View file

@ -16,12 +16,13 @@ export async function deleteGroup(
if (chat._ !== 'inputPeerChat')
throw new MtCuteInvalidPeerTypeError(chatId, 'chat')
await this.call({
const res = await this.call({
_: 'messages.deleteChatUser',
revokeHistory: true,
chatId: chat.chatId,
userId: { _: 'inputUserSelf' },
})
this._handleUpdate(res)
await this.call({
_: 'messages.deleteChat',

View file

@ -5,7 +5,10 @@ import {
MtCuteNotFoundError,
MtCuteTypeAssertionError,
} from '../../types'
import { INVITE_LINK_REGEX, normalizeToInputChannel } from '../../utils/peer-utils'
import {
INVITE_LINK_REGEX,
normalizeToInputChannel,
} from '../../utils/peer-utils'
/**
* Join a channel or supergroup
@ -34,6 +37,8 @@ export async function joinChat(
)
}
this._handleUpdate(res)
return new Chat(this, res.chats[0])
}
}
@ -53,5 +58,7 @@ export async function joinChat(
)
}
this._handleUpdate(res)
return new Chat(this, res.chats[0])
}

View file

@ -21,16 +21,18 @@ export async function leaveChat(
const input = normalizeToInputPeer(chat)
if (input._ === 'inputPeerChannel') {
await this.call({
const res = await this.call({
_: 'channels.leaveChannel',
channel: normalizeToInputChannel(chat)!,
})
this._handleUpdate(res)
} else if (input._ === 'inputPeerChat') {
await this.call({
const res = await this.call({
_: 'messages.deleteChatUser',
chatId: input.chatId,
userId: { _: 'inputUserSelf' },
})
this._handleUpdate(res)
if (clear) {
await this.deleteHistory(input)

View file

@ -59,5 +59,7 @@ export async function setChatDefaultPermissions(
)
}
this._handleUpdate(res)
return new Chat(this, res.chats[0])
}

View file

@ -37,6 +37,8 @@ export async function setChatPhoto(
if (typeof media === 'string' && media.match(/^https?:\/\//)) {
throw new MtCuteArgumentError("Chat photo can't be external")
} else if (typeof media === 'object' && tl.isAnyInputMedia(media)) {
throw new MtCuteArgumentError("Chat photo can't be InputMedia")
} else if (isUploadedFile(media)) {
input = media.inputFile
} else if (typeof media === 'object' && tl.isAnyInputFile(media)) {
@ -54,17 +56,19 @@ export async function setChatPhoto(
videoStartTs: previewSec
}
let res
if (chat._ === 'inputPeerChat') {
await this.call({
res = await this.call({
_: 'messages.editChatPhoto',
chatId: chat.chatId,
photo
})
} else {
await this.call({
res = await this.call({
_: 'channels.editPhoto',
channel: normalizeToInputChannel(chat)!,
photo
})
}
this._handleUpdate(res)
}

View file

@ -23,17 +23,19 @@ export async function setChatTitle(
if (!(chat._ === 'inputPeerChat' || chat._ === 'inputPeerChannel'))
throw new MtCuteInvalidPeerTypeError(chatId, 'chat or channel')
let res
if (chat._ === 'inputPeerChat') {
await this.call({
res = await this.call({
_: 'messages.editChatTitle',
chatId: chat.chatId,
title
})
} else {
await this.call({
res = await this.call({
_: 'channels.editTitle',
channel: normalizeToInputChannel(chat)!,
title
})
}
this._handleUpdate(res)
}

View file

@ -20,9 +20,10 @@ export async function setSlowMode(
const chat = normalizeToInputChannel(await this.resolvePeer(chatId))
if (!chat) throw new MtCuteInvalidPeerTypeError(chatId, 'channel')
await this.call({
const res = await this.call({
_: 'channels.toggleSlowMode',
channel: chat,
seconds
})
this._handleUpdate(res)
}

View file

@ -26,8 +26,9 @@ export async function unarchiveChats(
})
}
await this.call({
const res = await this.call({
_: 'folders.editPeerFolders',
folderPeers
})
this._handleUpdate(res)
}

View file

@ -37,5 +37,7 @@ export async function deleteMessages(
})
}
this._pts = res.pts
return !!res.ptsCount
}

View file

@ -16,6 +16,8 @@ export function _findMessageInUpdate(
res._
)
this._handleUpdate(res, true)
for (const u of res.updates) {
if (
isEdit && (

View file

@ -21,11 +21,13 @@ export async function pinMessage(
notify = false,
bothSides = false
): Promise<void> {
await this.call({
const res = await this.call({
_: 'messages.updatePinnedMessage',
peer: normalizeToInputPeer(await this.resolvePeer(chatId)),
id: messageId,
silent: !notify,
pmOneside: !bothSides
})
this._handleUpdate(res)
}

View file

@ -109,6 +109,9 @@ export async function sendText(
entities: res.entities,
}
this._pts = res.pts
this._date = res.date
return new Message(this, msg, {}, {})
}

View file

@ -17,10 +17,12 @@ export async function unpinMessage(
chatId: InputPeerLike,
messageId: number,
): Promise<void> {
await this.call({
const res = await this.call({
_: 'messages.updatePinnedMessage',
peer: normalizeToInputPeer(await this.resolvePeer(chatId)),
id: messageId,
unpin: true
})
this._handleUpdate(res)
}

View file

@ -18,6 +18,8 @@ const debug = require('debug')('mtcute:upds')
// tldr server sends multiple `updates` with the same seq, and that seq
// is also larger than the seq in the initial updates.getState response
// also code in this file is very bad, thanks to Telegram's awesome updates mechanism
// @extension
interface UpdatesState {
_updLock: Lock
@ -134,7 +136,64 @@ export function dispatchUpdate(
// no-op //
}
async function _loadDifference(this: TelegramClient): Promise<void> {
interface NoDispatchIndex {
// channel id or 0 => msg id
msg: Record<number, Record<number, true>>
// channel id or 0 => pts
pts: Record<number, Record<number, true>>
}
// creating and using a no-dispatch index is pretty expensive,
// but its not a big deal since it's actually rarely needed
function _createNoDispatchIndex(
updates?: tl.TypeUpdates
): NoDispatchIndex | undefined {
if (!updates) return undefined
const ret: NoDispatchIndex = {
msg: {},
pts: {},
}
if (updates._ === 'updates' || updates._ === 'updatesCombined') {
updates.updates.forEach((upd) => {
const cid = extractChannelIdFromUpdate(upd) ?? 0
if (
upd._ === 'updateNewMessage' ||
upd._ === 'updateNewChannelMessage'
) {
if (!ret.msg[cid]) ret.msg[cid] = {}
ret.msg[cid][upd.message.id] = true
}
const pts = 'pts' in upd ? upd.pts : undefined
if (pts) {
if (!ret.msg[cid]) ret.msg[cid] = {}
ret.msg[cid][pts] = true
}
})
}
if (
updates._ === 'updateShortMessage' ||
updates._ === 'updateShortChatMessage' ||
updates._ === 'updateShortSentMessage'
) {
// these updates are only used for non-channel messages, so we use 0
if (!ret.msg[0]) ret.msg[0] = {}
if (!ret.pts[0]) ret.pts[0] = {}
ret.msg[0][updates.id] = true
ret.pts[0][updates.pts] = true
}
return ret
}
async function _loadDifference(
this: TelegramClient,
noDispatch?: NoDispatchIndex
): Promise<void> {
for (;;) {
const diff = await this.call({
_: 'updates.getDifference',
@ -158,12 +217,59 @@ async function _loadDifference(this: TelegramClient): Promise<void> {
const { users, chats } = createUsersChatsIndex(diff)
diff.newMessages.forEach((message) =>
diff.newMessages.forEach((message) => {
if (noDispatch) {
// in fact this field seems to only be used for PMs and legacy chats,
// so `cid` will be 0 always, but that might change :shrug:
const cid =
message.peerId?._ === 'peerChannel'
? message.peerId.channelId
: 0
if (noDispatch.msg[cid][message.id]) return
}
this.dispatchUpdate(message, users, chats)
)
diff.otherUpdates.forEach((upd) =>
})
for (const upd of diff.otherUpdates) {
const cid = extractChannelIdFromUpdate(upd)
const pts = 'pts' in upd ? upd.pts : undefined
const ptsCount = 'ptsCount' in upd ? upd.ptsCount : undefined
if (cid && pts !== undefined && ptsCount !== undefined) {
// check that this pts is in fact the next one
// we only need to check this for channels since for
// common pts it is guaranteed by the server
// (however i would not really trust telegram server lol)
let nextLocalPts
if (cid in this._cpts) nextLocalPts = this._cpts[cid] + ptsCount
else {
const saved = await this.storage.getChannelPts(cid)
if (saved) {
this._cpts[cid] = saved
nextLocalPts = saved + ptsCount
} else {
nextLocalPts = null
}
}
if (nextLocalPts) {
if (nextLocalPts > pts) continue
if (nextLocalPts < pts) {
await _loadChannelDifference.call(this, cid, noDispatch)
continue
}
}
this._cpts[cid] = pts
}
if (noDispatch && pts) {
if (noDispatch.pts[cid ?? 0][pts]) continue
}
this.dispatchUpdate(upd, users, chats)
)
}
this._pts = state.pts
this._date = state.date
@ -174,7 +280,8 @@ async function _loadDifference(this: TelegramClient): Promise<void> {
async function _loadChannelDifference(
this: TelegramClient,
channelId: number
channelId: number,
noDispatch?: NoDispatchIndex
): Promise<void> {
let channel
try {
@ -209,17 +316,32 @@ async function _loadChannelDifference(
const { users, chats } = createUsersChatsIndex(diff)
diff.newMessages.forEach((message) =>
diff.newMessages.forEach((message) => {
if (noDispatch && noDispatch.msg[channelId][message.id]) return
this.dispatchUpdate(message, users, chats)
)
diff.otherUpdates.forEach((upd) =>
})
diff.otherUpdates.forEach((upd) => {
if (noDispatch) {
const pts = 'pts' in upd ? upd.pts : undefined
// we don't check for pts sequence here since the server
// is expected to return them in a correct order
// again, i would not trust Telegram server that much,
// but checking pts here seems like an overkill
if (pts && noDispatch.pts[channelId][pts]) return
}
this.dispatchUpdate(upd, users, chats)
)
})
pts = diff.pts
if (diff.final) break
}
this._cpts[channelId] = pts
}
/**
@ -227,7 +349,8 @@ async function _loadChannelDifference(
*/
export function _handleUpdate(
this: TelegramClient,
update: tl.TypeUpdates
update: tl.TypeUpdates,
noDispatch = false
): void {
// just in case, check that updates state is available
if (this._pts === undefined) {
@ -243,6 +366,8 @@ export function _handleUpdate(
// additionally, locking here blocks updates handling while we are
// loading difference inside update handler.
const noDispatchIndex = noDispatch ? _createNoDispatchIndex(update) : undefined
this._updLock
.acquire()
.then(async () => {
@ -253,7 +378,7 @@ export function _handleUpdate(
// reference: https://core.telegram.org/api/updates
if (update._ === 'updatesTooLong') {
// "there are too many events pending to be pushed to the client", we need to fetch them manually
await _loadDifference.call(this)
await _loadDifference.call(this, noDispatchIndex)
} else if (
update._ === 'updates' ||
update._ === 'updatesCombined'
@ -285,7 +410,8 @@ export function _handleUpdate(
}
return await _loadChannelDifference.call(
this,
upd.channelId
upd.channelId,
noDispatchIndex
)
}
@ -324,21 +450,24 @@ export function _handleUpdate(
// to bother handling them further.
return await _loadChannelDifference.call(
this,
channelId
channelId,
noDispatchIndex
)
} else {
return await _loadDifference.call(this)
}
}
this.dispatchUpdate(upd, users, chats)
if (!noDispatch) {
this.dispatchUpdate(upd, users, chats)
}
if (channelId) {
this._cpts[channelId] = pts
} else {
this._pts = pts
}
} else {
} else if (!noDispatch) {
this.dispatchUpdate(upd, users, chats)
}
}
@ -352,12 +481,14 @@ export function _handleUpdate(
upd.dcOptions
} else if (upd._ === 'updateConfig') {
this._config = await this.call({ _: 'help.getConfig' })
} else {
} else if (!noDispatch) {
this.dispatchUpdate(upd, {}, {})
}
this._date = update.date
} else if (update._ === 'updateShortMessage') {
if (noDispatch) return
const message: tl.RawMessage = {
_: 'message',
out: update.out,
@ -412,7 +543,7 @@ export function _handleUpdate(
id,
})
if (rawUsers.length < 2) {
if (rawUsers.length !== id.length) {
// other user failed to load.
// first try checking for input peer in storage
const saved = await this.storage.getPeerById(
@ -427,7 +558,7 @@ export function _handleUpdate(
})
}
}
if (rawUsers.length < 2) {
if (rawUsers.length !== id.length) {
// not saved (or invalid hash), not found by id
// find that user in dialogs (since the update
// is about an incoming message, dialog with that
@ -473,6 +604,8 @@ export function _handleUpdate(
})
this.dispatchUpdate(message, users, chats)
} else if (update._ === 'updateShortChatMessage') {
if (noDispatch) return
const message: tl.RawMessage = {
_: 'message',
out: update.out,
@ -506,7 +639,6 @@ export function _handleUpdate(
let rawUsers: tl.TypeUser[]
{
const id: tl.TypeInputUser[] = [
{ _: 'inputUserSelf' },
{
_: 'inputUser',
userId: update.fromId,
@ -523,6 +655,41 @@ export function _handleUpdate(
_: 'users.getUsers',
id,
})
if (rawUsers.length !== id.length) {
// user failed to load.
// first try checking for input peer in storage
const saved = await this.storage.getPeerById(
update.fromId
)
if (saved) {
id[0] = normalizeToInputUser(saved)!
rawUsers = await this.call({
_: 'users.getUsers',
id,
})
}
}
if (rawUsers.length !== id.length) {
// not saved (or invalid hash), not found by id
// find that user in chat participants list
const res = await this.call({
_: 'messages.getFullChat',
chatId: update.chatId,
})
const user = res.users.find(
(it) => it.id === update.fromId
)
if (!user) {
debug(
"received updateShortChatMessage, but wasn't able to find User"
)
return
}
rawUsers.push(user)
}
}
const rawChats = await this.call({
_: 'messages.getChats',
@ -548,6 +715,11 @@ export function _handleUpdate(
chats: rawChats,
})
this.dispatchUpdate(message, users, chats)
} else if (update._ === 'updateShortSentMessage') {
// only store the new pts and date values
// we never need to dispatch this
this._date = update.date
this._pts = update.pts
}
})
.catch((err) => this._emitError(err))
@ -560,5 +732,12 @@ export function _handleUpdate(
* @internal
*/
export function catchUp(this: TelegramClient): Promise<void> {
return _loadDifference.call(this)
// we also use a lock here so new updates are not processed
// while we are catching up with older ones
return this._updLock
.acquire()
.then(() => _loadDifference.call(this))
.catch((err) => this._emitError(err))
.then(() => this._updLock.release())
}