diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index c607c237..379ce245 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -71,13 +71,18 @@ import { setDefaultParseMode, unregisterParseMode, } from './methods/parse-modes/parse-modes' -import { catchUp } from './methods/updates/catch-up' import { _dispatchUpdate, addUpdateHandler, removeUpdateHandler, } from './methods/updates/dispatcher' -import { _handleUpdate } from './methods/updates/handle-update' +import { + _fetchUpdatesState, + _handleUpdate, + _loadStorage, + _saveStorage, + catchUp, +} from './methods/updates/handle-update' import { onNewMessage } from './methods/updates/on-new-message' import { blockUser } from './methods/users/block-user' import { getCommonChats } from './methods/users/get-common-chats' @@ -112,8 +117,15 @@ import { handlers, } from './types' import { MaybeArray, MaybeAsync, TelegramConnection } from '@mtcute/core' +import { Lock } from './utils/lock' export class TelegramClient extends BaseTelegramClient { + // from methods/auth/_initialize.ts + protected _userId: number | null + + // from methods/auth/_initialize.ts + protected _isBot: boolean + // from methods/files/_initialize.ts protected _downloadConnections: Record @@ -129,13 +141,35 @@ export class TelegramClient extends BaseTelegramClient { // from methods/updates/dispatcher.ts protected _groupsOrder: number[] + // from methods/updates/handle-update.ts + protected _updLock: Lock + + // from methods/updates/handle-update.ts + protected _pts: number + + // from methods/updates/handle-update.ts + protected _date: number + + // from methods/updates/handle-update.ts + protected _cpts: Record + constructor(opts: BaseTelegramClient.Options) { super(opts) + this._userId = null + this._isBot = false this._downloadConnections = {} this._parseModes = {} this._defaultParseMode = null this._groups = {} this._groupsOrder = [] + this._updLock = new Lock() + // we dont need to initialize state fields since + // they are always loaded either from the server, or from storage. + + // channel PTS are not loaded immediately, and instead are cached here + // after the first time they were retrieved from the storage. + // they are later pushed into the storage. + this._cpts = {} } /** @@ -1714,18 +1748,11 @@ export class TelegramClient extends BaseTelegramClient { setDefaultParseMode(name: string): void { return setDefaultParseMode.apply(this, arguments) } - /** - * Catch up with the server by loading missed updates. - * - */ - catchUp(): Promise { - return catchUp.apply(this, arguments) - } protected _dispatchUpdate( - update: tl.TypeUpdate, + update: tl.TypeUpdate | tl.TypeMessage, users: Record, chats: Record - ): Promise { + ): void { return _dispatchUpdate.apply(this, arguments) } /** @@ -1750,9 +1777,30 @@ export class TelegramClient extends BaseTelegramClient { ): void { return removeUpdateHandler.apply(this, arguments) } + /** + * Fetch updates state from the server. + * Meant to be used right after authorization, + * but before force-saving the session. + */ + protected _fetchUpdatesState(): Promise { + return _fetchUpdatesState.apply(this, arguments) + } + protected _loadStorage(): Promise { + return _loadStorage.apply(this, arguments) + } + protected _saveStorage(): Promise { + return _saveStorage.apply(this, arguments) + } protected _handleUpdate(update: tl.TypeUpdates): void { return _handleUpdate.apply(this, arguments) } + /** + * Catch up with the server by loading missed updates. + * + */ + catchUp(): Promise { + return catchUp.apply(this, arguments) + } /** * Register a message handler without any filters. * diff --git a/packages/client/src/methods/_imports.ts b/packages/client/src/methods/_imports.ts index 4880aa7e..418dc4fe 100644 --- a/packages/client/src/methods/_imports.ts +++ b/packages/client/src/methods/_imports.ts @@ -35,3 +35,6 @@ import { // @copy import { MaybeArray, MaybeAsync, TelegramConnection } from '@mtcute/core' + +// @copy +import { Lock } from '../utils/lock' diff --git a/packages/client/src/methods/auth/_initialize.ts b/packages/client/src/methods/auth/_initialize.ts new file mode 100644 index 00000000..bf597b79 --- /dev/null +++ b/packages/client/src/methods/auth/_initialize.ts @@ -0,0 +1,17 @@ +import { TelegramClient } from '../../client' + +// @extension +interface AuthState { + // local copy of "self" in storage, + // so we can use it w/out relying on storage. + // they are both loaded and saved to storage along with the updates + // (see methods/updates/handle-update) + _userId: number | null + _isBot: boolean +} + +// @initialize +function _initializeAuthState(this: TelegramClient) { + this._userId = null + this._isBot = false +} diff --git a/packages/client/src/methods/auth/check-password.ts b/packages/client/src/methods/auth/check-password.ts index 2eeb01c5..15e2cec4 100644 --- a/packages/client/src/methods/auth/check-password.ts +++ b/packages/client/src/methods/auth/check-password.ts @@ -37,10 +37,9 @@ export async function checkPassword( 'user' ) - await this.storage.setSelf({ - userId: res.user.id, - isBot: false, - }) + this._userId = res.user.id + this._isBot = false + await this._fetchUpdatesState() await this._saveStorage() return new User(this, res.user) diff --git a/packages/client/src/methods/auth/log-out.ts b/packages/client/src/methods/auth/log-out.ts index 6f592737..ba62bbe8 100644 --- a/packages/client/src/methods/auth/log-out.ts +++ b/packages/client/src/methods/auth/log-out.ts @@ -17,6 +17,9 @@ export async function logOut( await this.call({ _: 'auth.logOut' }) if (resetSession) { + this._userId = null + this._isBot = false + this._pts = this._seq = this._date = undefined as any this.storage.reset() await this._saveStorage() } diff --git a/packages/client/src/methods/auth/recover-password.ts b/packages/client/src/methods/auth/recover-password.ts index 3c780079..55b7fc1c 100644 --- a/packages/client/src/methods/auth/recover-password.ts +++ b/packages/client/src/methods/auth/recover-password.ts @@ -30,10 +30,8 @@ export async function recoverPassword( 'user' ) - await this.storage.setSelf({ - userId: res.user.id, - isBot: false, - }) + this._userId = res.user.id + this._isBot = false await this._saveStorage() return new User(this, res.user) diff --git a/packages/client/src/methods/auth/sign-in-bot.ts b/packages/client/src/methods/auth/sign-in-bot.ts index 57e3f198..419f00d1 100644 --- a/packages/client/src/methods/auth/sign-in-bot.ts +++ b/packages/client/src/methods/auth/sign-in-bot.ts @@ -33,10 +33,9 @@ export async function signInBot( 'user' ) - await this.storage.setSelf({ - userId: res.user.id, - isBot: true, - }) + this._userId = res.user.id + this._isBot = true + await this._fetchUpdatesState() await this._saveStorage() return new User(this, res.user) diff --git a/packages/client/src/methods/auth/sign-in.ts b/packages/client/src/methods/auth/sign-in.ts index e84e46b3..7193c51b 100644 --- a/packages/client/src/methods/auth/sign-in.ts +++ b/packages/client/src/methods/auth/sign-in.ts @@ -41,10 +41,9 @@ export async function signIn( assertTypeIs('signIn (@ auth.signIn -> user)', res.user, 'user') - await this.storage.setSelf({ - userId: res.user.id, - isBot: false, - }) + this._userId = res.user.id + this._isBot = false + await this._fetchUpdatesState() await this._saveStorage() return new User(this, res.user) diff --git a/packages/client/src/methods/auth/sign-up.ts b/packages/client/src/methods/auth/sign-up.ts index 226ef88a..ccbaa5df 100644 --- a/packages/client/src/methods/auth/sign-up.ts +++ b/packages/client/src/methods/auth/sign-up.ts @@ -32,10 +32,9 @@ export async function signUp( assertTypeIs('signUp (@ auth.signUp)', res, 'auth.authorization') assertTypeIs('signUp (@ auth.signUp -> user)', res.user, 'user') - await this.storage.setSelf({ - userId: res.user.id, - isBot: false, - }) + this._userId = res.user.id + this._isBot = false + await this._fetchUpdatesState() await this._saveStorage() return new User(this, res.user) diff --git a/packages/client/src/methods/auth/start.ts b/packages/client/src/methods/auth/start.ts index 52788728..8ec4f5a3 100644 --- a/packages/client/src/methods/auth/start.ts +++ b/packages/client/src/methods/auth/start.ts @@ -222,7 +222,7 @@ export async function start( if (e instanceof PasswordHashInvalidError) { if (params.invalidCodeCallback) { - params.invalidCodeCallback('password') + await params.invalidCodeCallback('password') } else { console.log('Invalid password. Please try again') } @@ -245,7 +245,7 @@ export async function start( if (result instanceof TermsOfService && !params.acceptTos) { if (params.tosCallback) { - params.tosCallback(result) + await params.tosCallback(result) } else { console.log(result.text) } diff --git a/packages/client/src/methods/updates/catch-up.ts b/packages/client/src/methods/updates/catch-up.ts deleted file mode 100644 index 6d520ddc..00000000 --- a/packages/client/src/methods/updates/catch-up.ts +++ /dev/null @@ -1,83 +0,0 @@ -import { TelegramClient } from '../../client' -import { tl } from '@mtcute/tl' - -const debug = require('debug')('mtcute:upds') - -/** - * Catch up with the server by loading missed updates. - * - * @internal - */ -export async function catchUp(this: TelegramClient): Promise { - // this doesn't work with missed channel updates properly - // todo: fix - const state = await this.storage.getCommonPts() - if (!state) return - - let [pts, date] = state - - let error: Error | null = null - try { - for (;;) { - const diff = await this.call({ - _: 'updates.getDifference', - pts, - date, - qts: 0, - }) - - if ( - diff._ === 'updates.difference' || - diff._ === 'updates.differenceSlice' - ) { - const state = - diff._ === 'updates.difference' - ? diff.state - : diff.intermediateState - pts = state.pts - date = state.date - - this._handleUpdate({ - _: 'updates', - users: diff.users, - chats: diff.chats, - date: state.date, - seq: state.seq, - updates: [ - ...diff.otherUpdates, - ...diff.newMessages.map( - (m) => - ({ - _: 'updateNewMessage', - message: m, - pts: 0, - ptsCount: 0, - } as tl.RawUpdateNewMessage) - ), - ], - }) - - debug( - 'catching up... processed %d updates and %d messages', - diff.otherUpdates.length, - diff.newMessages.length - ) - } else { - if (diff._ === 'updates.differenceEmpty') { - date = diff.date - } else if (diff._ === 'updates.differenceTooLong') { - pts = diff.pts - } - break - } - } - } catch (e) { - error = e - debug('error while catching up: ' + error) - } - - debug('caught up') - - await this.storage.setCommonPts([pts, date]) - await this.storage.save?.() -} diff --git a/packages/client/src/methods/updates/dispatcher.ts b/packages/client/src/methods/updates/dispatcher.ts index 68d65983..b9be09f9 100644 --- a/packages/client/src/methods/updates/dispatcher.ts +++ b/packages/client/src/methods/updates/dispatcher.ts @@ -7,6 +7,7 @@ import { PropagationSymbol, StopPropagation, } from '../../types' +import { createUsersChatsIndex } from '../../utils/peer-utils' // @extension interface DispatcherExtension { @@ -23,47 +24,71 @@ function _initializeDispatcher() { /** * @internal */ -export async function _dispatchUpdate( +export function _dispatchUpdate( this: TelegramClient, - update: tl.TypeUpdate, + update: tl.TypeUpdate | tl.TypeMessage, users: Record, chats: Record -): Promise { - let message: Message | null = null - if ( - update._ === 'updateNewMessage' || - update._ === 'updateNewChannelMessage' || - update._ === 'updateNewScheduledMessage' || - update._ === 'updateEditMessage' || - update._ === 'updateEditChannelMessage' - ) { - message = new Message(this, update.message, users, chats) - } +): void { + ;(async () => { + // order does not matter in the dispatcher, + // so we can handle each update in its own task - for (const grp of this._groupsOrder) { - for (const handler of this._groups[grp]) { - let result: void | PropagationSymbol + const isRawMessage = tl.isAnyMessage(update) - if ( - handler.type === 'raw' && - (!handler.check || - (await handler.check(this, update, users, chats))) - ) { - result = await handler.callback(this, update, users, chats) - } else if ( - handler.type === 'new_message' && - message && - (!handler.check || (await handler.check(message, this))) - ) { - result = await handler.callback(message, this) - } else continue - - if (result === ContinuePropagation) continue - if (result === StopPropagation) return - - break + let message: Message | null = null + if ( + update._ === 'updateNewMessage' || + update._ === 'updateNewChannelMessage' || + update._ === 'updateNewScheduledMessage' || + update._ === 'updateEditMessage' || + update._ === 'updateEditChannelMessage' || + isRawMessage + ) { + message = new Message( + this, + isRawMessage ? update : (update as any).message, + users, + chats + ) } - } + + for (const grp of this._groupsOrder) { + for (const handler of this._groups[grp]) { + let result: void | PropagationSymbol + + if ( + handler.type === 'raw' && + !isRawMessage && + (!handler.check || + (await handler.check( + this, + update as any, + users, + chats + ))) + ) { + result = await handler.callback( + this, + update as any, + users, + chats + ) + } else if ( + handler.type === 'new_message' && + message && + (!handler.check || (await handler.check(message, this))) + ) { + result = await handler.callback(message, this) + } else continue + + if (result === ContinuePropagation) continue + if (result === StopPropagation) return + + break + } + } + })().catch((err) => this._emitError(err)) } /** diff --git a/packages/client/src/methods/updates/handle-update.ts b/packages/client/src/methods/updates/handle-update.ts index 138323e9..bae0d8e1 100644 --- a/packages/client/src/methods/updates/handle-update.ts +++ b/packages/client/src/methods/updates/handle-update.ts @@ -1,12 +1,199 @@ import { tl } from '@mtcute/tl' import { TelegramClient } from '../../client' -import { ChannelPrivateError } from '@mtcute/tl/errors' -import { MAX_CHANNEL_ID } from '@mtcute/core' -import { createUsersChatsIndex, normalizeToInputChannel } from '../../utils/peer-utils' +import { + createUsersChatsIndex, + normalizeToInputChannel, + normalizeToInputUser, + peerToInputPeer, +} from '../../utils/peer-utils' import { extractChannelIdFromUpdate } from '../../utils/misc-utils' +import { Lock } from '../../utils/lock' +import bigInt from 'big-integer' +import { MAX_CHANNEL_ID } from '../../../../core' const debug = require('debug')('mtcute:upds') +// i tried to implement updates seq, but that thing seems to be +// broken on the server side, lol (see https://t.me/teispam/1199, ru) +// tldr server sends multiple `updates` with the same seq, and that seq +// is also larger than the seq in the initial updates.getState response + +// @extension +interface UpdatesState { + _updLock: Lock + + // accessing storage every time might be expensive, + // so store everything here, and load & save + // every time session is loaded & saved. + _pts: number + _date: number + // _seq: number + _cpts: Record +} + +// @initialize +function _initializeUpdates(this: TelegramClient) { + this._updLock = new Lock() + // we dont need to initialize state fields since + // they are always loaded either from the server, or from storage. + + // channel PTS are not loaded immediately, and instead are cached here + // after the first time they were retrieved from the storage. + // they are later pushed into the storage. + this._cpts = {} +} + +/** + * Fetch updates state from the server. + * Meant to be used right after authorization, + * but before force-saving the session. + * @internal + */ +export async function _fetchUpdatesState(this: TelegramClient): Promise { + const state = await this.call({ _: 'updates.getState' }) + this._pts = state.pts + this._date = state.date + // this._seq = state.seq + debug( + 'loaded initial state: pts=%d, date=%d', // , seq=%d', + state.pts, + state.date + // state.seq + ) +} + +/** + * @internal + */ +export async function _loadStorage(this: TelegramClient): Promise { + // load updates state from the session + await this.storage.load?.() + const state = await this.storage.getCommonPts() + if (state) { + this._pts = state[0] + this._date = state[1] + // this._seq = state[2] + } + // if no state, don't bother initializing properties + // since that means that there is no authorization, + // and thus _fetchUpdatesState will be called + + const self = await this.storage.getSelf() + if (self) { + this._userId = self.userId + this._isBot = self.isBot + } +} + +/** + * @internal + */ +export async function _saveStorage(this: TelegramClient): Promise { + // save updates state to the session + + // before any authorization pts will be undefined + if (this._pts !== undefined) { + await this.storage.setCommonPts([this._pts, this._date]) // , this._seq]) + await this.storage.setManyChannelPts(this._cpts) + } + if (this._userId !== null) { + await this.storage.setSelf({ + userId: this._userId, + isBot: this._isBot, + }) + } + + await this.storage.save?.() +} + +async function _loadDifference(this: TelegramClient): Promise { + for (;;) { + const diff = await this.call({ + _: 'updates.getDifference', + pts: this._pts, + date: this._date, + qts: 0, + }) + + if ( + diff._ === 'updates.differenceEmpty' || + diff._ === 'updates.differenceTooLong' + ) + return + + const state = + diff._ === 'updates.difference' + ? diff.state + : diff.intermediateState + + await this._cachePeersFrom(diff) + + const { users, chats } = createUsersChatsIndex(diff) + + diff.newMessages.forEach((message) => + this._dispatchUpdate(message, users, chats) + ) + diff.otherUpdates.forEach((upd) => + this._dispatchUpdate(upd, users, chats) + ) + + this._pts = state.pts + this._date = state.date + + if (diff._ === 'updates.difference') return + } +} + +async function _loadChannelDifference( + this: TelegramClient, + channelId: number +): Promise { + let channel + try { + channel = normalizeToInputChannel( + await this.resolvePeer(MAX_CHANNEL_ID - channelId) + )! + } catch (e) { + return + } + + let pts = this._cpts[channelId] + if (!pts) { + pts = (await this.storage.getChannelPts(channelId)) ?? 0 + } + + for (;;) { + const diff = await this.call({ + _: 'updates.getChannelDifference', + channel, + pts, + limit: this._isBot ? 1000 : 100, + filter: { _: 'channelMessagesFilterEmpty' }, + }) + + if ( + diff._ === 'updates.channelDifferenceEmpty' || + diff._ === 'updates.channelDifferenceTooLong' + ) + return + + await this._cachePeersFrom(diff) + + const { users, chats } = createUsersChatsIndex(diff) + + diff.newMessages.forEach((message) => + this._dispatchUpdate(message, users, chats) + ) + diff.otherUpdates.forEach((upd) => + this._dispatchUpdate(upd, users, chats) + ) + + pts = diff.pts + + if (diff.final) break + } +} + /** * @internal */ @@ -14,177 +201,287 @@ export function _handleUpdate( this: TelegramClient, update: tl.TypeUpdates ): void { - ;(async () => { - debug('received %s', update._) + // just in case, check that updates state is available + if (this._pts === undefined) { + debug('received an update before updates state is available') + return + } - // https://github.com/pyrogram/pyrogram/blob/a86656aefcc93cc3d2f5c98227d5da28fcddb136/pyrogram/client.py#L521 - if (update._ === 'updates' || update._ === 'updatesCombined') { - const isMin = await this._cachePeersFrom(update) + // we want to process updates in order, so we use a lock + // it is *very* important that the lock is released, otherwise + // the incoming updates will be stuck forever, eventually killing the process with OOM + // thus, we wrap everything in what basically is a try..finally - const { users, chats } = createUsersChatsIndex(update) + // additionally, locking here blocks updates handling while we are + // loading difference inside update handler. - for (const upd of update.updates) { - if (upd._ === 'updateChannelTooLong') { - // what are we supposed to do with this? - debug( - 'received updateChannelTooLong for channel %d (pts %d)', - upd.channelId, - upd.pts - ) - continue - } + this._updLock + .acquire() + .then(async () => { + debug('received %s', update._) - const channelId = extractChannelIdFromUpdate(upd) - const pts = 'pts' in upd ? upd.pts : undefined - const ptsCount = 'ptsCount' in upd ? upd.ptsCount : undefined - const date = 'date' in upd ? upd.date : undefined + // i tried my best to follow the documentation, but i still may have missed something. + // feel free to contribute! + // reference: https://core.telegram.org/api/updates + if (update._ === 'updatesTooLong') { + // "there are too many events pending to be pushed to the client", we need to fetch them manually + await _loadDifference.call(this) + } else if ( + update._ === 'updates' || + update._ === 'updatesCombined' + ) { + // const seqStart = + // update._ === 'updatesCombined' + // ? update.seqStart + // : update.seq + // const nextLocalSeq = this._seq + 1 + // + // debug('received %s (seq_start=%d, seq_end=%d)', update._, seqStart, update.seq) + // + // if (nextLocalSeq > seqStart) + // // "the updates were already applied, and must be ignored" + // return + // if (nextLocalSeq < seqStart) + // // "there's an updates gap that must be filled" + // // loading difference will also load any updates contained + // // in this update, so we discard it + // return await _loadDifference.call(this) - if (upd._ === 'updateNewChannelMessage' && isMin) { - // min entities are useless, so we need to fetch actual entities - const msg = upd.message + await this._cachePeersFrom(update) + const { users, chats } = createUsersChatsIndex(update) - if (msg._ !== 'messageEmpty') { - let diff: - | tl.RpcCallReturn['updates.getChannelDifference'] - | null = null + for (const upd of update.updates) { + if (upd._ === 'updateChannelTooLong') { + if (upd.pts) { + this._cpts[upd.channelId] = upd.pts + } + return await _loadChannelDifference.call(this, upd.channelId) + } - const channel = normalizeToInputChannel( - await this.resolvePeer(MAX_CHANNEL_ID - channelId!) - ) - if (!channel) return + const channelId = extractChannelIdFromUpdate(upd) + const pts = 'pts' in upd ? upd.pts : undefined + const ptsCount = + 'ptsCount' in upd ? upd.ptsCount : undefined - try { - diff = await this.call({ - _: 'updates.getChannelDifference', - channel: channel, - filter: { - _: 'channelMessagesFilter', - ranges: [ - { - _: 'messageRange', - minId: upd.message.id, - maxId: upd.message.id, - }, - ], - }, - pts: pts! - ptsCount!, - limit: pts!, - }) - } catch (e) { - if (!(e instanceof ChannelPrivateError)) throw e + if (pts !== undefined && ptsCount !== undefined) { + let nextLocalPts + if (channelId === undefined) + nextLocalPts = this._pts + ptsCount + else if (channelId in this._cpts) + nextLocalPts = this._cpts[channelId] + ptsCount + else { + const saved = await this.storage.getChannelPts( + channelId + ) + if (saved) { + this._cpts[channelId] = saved + nextLocalPts = saved + ptsCount + } else { + nextLocalPts = null + } } - if ( - diff && - diff._ !== 'updates.channelDifferenceEmpty' - ) { - diff.users.forEach((u) => (users[u.id] = u)) - diff.chats.forEach((u) => (chats[u.id] = u)) + if (nextLocalPts) { + if (nextLocalPts > pts) + // "the update was already applied, and must be ignored" + return + if (nextLocalPts < pts) + // "there's an update gap that must be filled" + // same as before, loading diff will also load + // any of the pending updates, so we don't need + // to bother handling them further. + if (channelId) { + return await _loadChannelDifference.call(this, channelId) + } else { + return await _loadDifference.call(this) + } } + + this._dispatchUpdate(upd, users, chats) + + if (channelId) { + this._cpts[channelId] = pts + } else { + this._pts = pts + } + } else { + this._dispatchUpdate(upd, users, chats) } } - if (channelId && pts) { - await this.storage.setChannelPts(channelId, pts) - } - if (!channelId && (pts || date)) { - await this.storage.setCommonPts([pts || null, date || null]) + // this._seq = update.seq + this._date = update.date + } else if (update._ === 'updateShort') { + const upd = update.update + if (upd._ === 'updateDcOptions' && this._config) { + ;(this._config as tl.Mutable).dcOptions = + upd.dcOptions + } else if (upd._ === 'updateConfig') { + this._config = await this.call({ _: 'help.getConfig' }) + } else { + this._dispatchUpdate(upd, {}, {}) } - await this._dispatchUpdate(upd, users, chats) - } + this._date = update.date + } else if (update._ === 'updateShortMessage') { + const message: tl.RawMessage = { + _: 'message', + out: update.out, + mentioned: update.mentioned, + mediaUnread: update.mediaUnread, + silent: update.silent, + id: update.id, + fromId: { + _: 'peerUser', + userId: update.out ? this._userId! : update.userId, + }, + peerId: { + _: 'peerUser', + userId: update.userId, + }, + fwdFrom: update.fwdFrom, + viaBotId: update.viaBotId, + replyTo: update.replyTo, + date: update.date, + message: update.message, + entities: update.entities, + ttlPeriod: update.ttlPeriod, + } - await this.storage.setCommonPts([null, update.date]) - // } else if (update._ === 'updateShortMessage') { - // const self = await this.storage.getSelf() - // - // const message: tl.RawMessage = { - // _: 'message', - // out: update.out, - // mentioned: update.mentioned, - // mediaUnread: update.mediaUnread, - // silent: update.silent, - // id: update.id, - // fromId: { - // _: 'peerUser', - // userId: update.out ? self!.userId : update.userId - // }, - // peerId: { - // _: 'peerUser', - // userId: update.userId - // }, - // fwdFrom: update.fwdFrom, - // viaBotId: update.viaBotId, - // replyTo: update.replyTo, - // date: update.date, - // message: update.message, - // entities: update.entities, - // ttlPeriod: update.ttlPeriod - // } - // } else if (update._ === 'updateShortChatMessage') { - // const message: tl.RawMessage = { - // _: 'message', - // out: update.out, - // mentioned: update.mentioned, - // mediaUnread: update.mediaUnread, - // silent: update.silent, - // id: update.id, - // fromId: { - // _: 'peerUser', - // userId: update.fromId - // }, - // peerId: { - // _: 'peerChat', - // chatId: update.chatId - // }, - // fwdFrom: update.fwdFrom, - // viaBotId: update.viaBotId, - // replyTo: update.replyTo, - // date: update.date, - // message: update.message, - // entities: update.entities, - // ttlPeriod: update.ttlPeriod - // } - // - } else if ( - update._ === 'updateShortMessage' || - update._ === 'updateShortChatMessage' - ) { - await this.storage.setCommonPts([update.pts, update.date]) + // now we need to fetch info about users involved. + // since this update is only used for PM, we can just + // fetch the current user and the other user. + // additionally, we need to handle "forwarded from" + // field, as it may contain a user OR a channel + const fwdFrom = update.fwdFrom?.fromId + ? peerToInputPeer(update.fwdFrom.fromId) + : undefined - // these short updates don't contain users & chats, - // so we use updates.getDifference to fetch them - // definitely not the best way, but whatever - const diff = await this.call({ - _: 'updates.getDifference', - pts: update.pts - update.ptsCount, - date: update.date, - qts: -1, - }) - - if (diff._ === 'updates.difference') { - if (diff.newMessages.length) { - const { users, chats } = createUsersChatsIndex(diff) - - await this._dispatchUpdate( + let rawUsers: tl.TypeUser[] + { + const id: tl.TypeInputUser[] = [ + { _: 'inputUserSelf' }, { - _: 'updateNewMessage', - message: diff.newMessages[0], - pts: update.pts, - ptsCount: update.ptsCount, + _: 'inputUser', + userId: update.userId, + accessHash: bigInt.zero, }, - users, - chats - ) - } else if (diff.otherUpdates.length) { - await this._dispatchUpdate(diff.otherUpdates[0], {}, {}) + ] + + if (fwdFrom) { + const inputUser = normalizeToInputUser(fwdFrom) + if (inputUser) id.push(inputUser) + } + + rawUsers = await this.call({ + _: 'users.getUsers', + id, + }) } + let rawChats: tl.TypeChat[] = [] + if (fwdFrom) { + const inputChannel = normalizeToInputChannel(fwdFrom) + if (inputChannel) + rawChats = await this.call({ + _: 'channels.getChannels', + id: [inputChannel], + }).then((res) => res.chats) + } + + this._date = update.date + + const { users, chats } = createUsersChatsIndex({ + users: rawUsers, + chats: rawChats, + }) + this._dispatchUpdate(message, users, chats) + } else if (update._ === 'updateShortChatMessage') { + const message: tl.RawMessage = { + _: 'message', + out: update.out, + mentioned: update.mentioned, + mediaUnread: update.mediaUnread, + silent: update.silent, + id: update.id, + fromId: { + _: 'peerUser', + userId: update.fromId, + }, + peerId: { + _: 'peerChat', + chatId: update.chatId, + }, + fwdFrom: update.fwdFrom, + viaBotId: update.viaBotId, + replyTo: update.replyTo, + date: update.date, + message: update.message, + entities: update.entities, + ttlPeriod: update.ttlPeriod, + } + + // similarly to updateShortMessage, we need to fetch the sender + // user and the chat, and also handle "forwarded from" info. + const fwdFrom = update.fwdFrom?.fromId + ? peerToInputPeer(update.fwdFrom.fromId) + : undefined + + let rawUsers: tl.TypeUser[] + { + const id: tl.TypeInputUser[] = [ + { _: 'inputUserSelf' }, + { + _: 'inputUser', + userId: update.fromId, + accessHash: bigInt.zero, + }, + ] + + if (fwdFrom) { + const inputUser = normalizeToInputUser(fwdFrom) + if (inputUser) id.push(inputUser) + } + + rawUsers = await this.call({ + _: 'users.getUsers', + id, + }) + } + const rawChats = await this.call({ + _: 'messages.getChats', + id: [update.chatId], + }).then((res) => res.chats) + + if (fwdFrom) { + const inputChannel = normalizeToInputChannel(fwdFrom) + if (inputChannel) { + const res = await this.call({ + _: 'channels.getChannels', + id: [inputChannel], + }) + rawChats.push(...res.chats) + } + } + + this._date = update.date + const { users, chats } = createUsersChatsIndex({ + users: rawUsers, + chats: rawChats, + }) + this._dispatchUpdate(message, users, chats) } - } else if (update._ === 'updateShort') { - await this._dispatchUpdate(update.update, {}, {}) - await this.storage.setCommonPts([null, update.date]) - } else if (update._ === 'updatesTooLong') { - debug('got updatesTooLong') - } - })().catch((err) => this._emitError(err)) + }) + .catch((err) => this._emitError(err)) + .then(() => this._updLock.release()) } + + +/** + * Catch up with the server by loading missed updates. + * + * @internal + */ +export function catchUp(this: TelegramClient): Promise { + return _loadDifference.call(this) +} + diff --git a/packages/client/src/types/messages/message.ts b/packages/client/src/types/messages/message.ts index a94738b9..373f06c0 100644 --- a/packages/client/src/types/messages/message.ts +++ b/packages/client/src/types/messages/message.ts @@ -150,9 +150,7 @@ export namespace Message { readonly inviter: number } - export interface MessageForwardInfo< - Sender extends User | Chat | string = User | Chat | string - > { + export interface MessageForwardInfo { /** * Date the original message was sent */ @@ -162,7 +160,7 @@ export namespace Message { * Sender of the original message (either user or a channel) * or their name (for users with private forwards) */ - sender: Sender + sender: User | Chat | string /** * For messages forwarded from channels, diff --git a/packages/client/src/types/updates/filters.ts b/packages/client/src/types/updates/filters.ts index 19445838..f36fdeb7 100644 --- a/packages/client/src/types/updates/filters.ts +++ b/packages/client/src/types/updates/filters.ts @@ -1,7 +1,7 @@ import { TelegramClient } from '../../client' import { MaybeArray, MaybeAsync } from '@mtcute/core' import { Message } from '../messages' -import { User } from '../peers' +import { Chat, User } from '../peers' import { Dice, Photo, @@ -285,6 +285,14 @@ export namespace filters { export const bot: UpdateFilter = (msg) => msg.sender instanceof User && msg.sender.isBot + /** + * Filter messages sent in broadcast channels + */ + export const channel: UpdateFilter< + Message, + { chat: Modify } + > = (msg) => msg.chat.type === 'channel' + /** * Filter incoming messages. * diff --git a/packages/client/src/utils/lock.ts b/packages/client/src/utils/lock.ts new file mode 100644 index 00000000..0e066745 --- /dev/null +++ b/packages/client/src/utils/lock.ts @@ -0,0 +1,24 @@ +/** @internal */ +export class Lock { + private _prom: Promise | null = null + private _unlock: (() => void) | null = null + + constructor() { + this._prom = null + this._unlock = null + } + + async acquire(): Promise { + if (this._prom) await this._prom + this._prom = new Promise((resolve) => { + this._unlock = resolve + }) + } + + release(): void { + if (!this._unlock) return + this._unlock() + this._prom = null + this._unlock = null + } +} diff --git a/packages/client/src/utils/peer-utils.ts b/packages/client/src/utils/peer-utils.ts index 05490293..c627f3ad 100644 --- a/packages/client/src/utils/peer-utils.ts +++ b/packages/client/src/utils/peer-utils.ts @@ -1,4 +1,5 @@ import { tl } from '@mtcute/tl' +import bigInt from 'big-integer' export const INVITE_LINK_REGEX = /^(?:https?:\/\/)?(?:www\.)?(?:t(?:elegram)?\.(?:org|me|dog)\/joinchat\/)([\w-]+)$/i @@ -77,7 +78,19 @@ export function inputPeerToPeer(inp: tl.TypeInputPeer): tl.TypePeer { if (inp._ === 'inputPeerChat') return { _: 'peerChat', chatId: inp.chatId } - return inp as never + throw new Error(`Cannot convert ${inp._} to peer`) +} + +export function peerToInputPeer(peer: tl.TypePeer, accessHash = bigInt.zero): tl.TypeInputPeer { + if (peer._ === 'peerUser') + return { _: 'inputPeerUser', userId: peer.userId, accessHash } + + if (peer._ === 'peerChannel') + return { _: 'inputPeerChannel', channelId: peer.channelId, accessHash } + + if (peer._ === 'peerChat') return { _: 'inputPeerChat', chatId: peer.chatId } + + return peer as never } export function createUsersChatsIndex( diff --git a/packages/core/src/base-client.ts b/packages/core/src/base-client.ts index 9def8808..4101e6a1 100644 --- a/packages/core/src/base-client.ts +++ b/packages/core/src/base-client.ts @@ -207,8 +207,8 @@ export class BaseTelegramClient { private _lastRequestTime = 0 private _floodWaitedRequests: Record = {} - private _config?: tl.RawConfig - private _cdnConfig?: tl.RawCdnConfig + protected _config?: tl.RawConfig + protected _cdnConfig?: tl.RawCdnConfig private _additionalConnections: TelegramConnection[] = [] @@ -639,15 +639,13 @@ export class BaseTelegramClient { /** * Adds all peers from a given object to entity cache in storage. - * Returns boolean indicating whether there were any `min` entities. */ - protected async _cachePeersFrom(obj: any): Promise { - let isMin = false + protected async _cachePeersFrom(obj: any): Promise { const parsedPeers: ITelegramStorage.PeerInfo[] = [] for (const peer of getAllPeersFrom(obj)) { - if ('min' in peer && peer.min) { - isMin = true + if ((peer as any).min && !peer.fromMessage && !(peer as any).bot) { + debug('peer is min, but no context was found: %o %o', obj, peer) continue } @@ -659,6 +657,7 @@ export class BaseTelegramClient { phone: peer.phone ?? null, type: peer.bot ? 'bot' : 'user', updated: 0, + fromMessage: peer.fromMessage }) } else if (peer._ === 'chat' || peer._ === 'chatForbidden') { parsedPeers.push({ @@ -668,6 +667,7 @@ export class BaseTelegramClient { phone: null, type: 'group', updated: 0, + fromMessage: peer.fromMessage }) } else if (peer._ === 'channel' || peer._ === 'channelForbidden') { parsedPeers.push({ @@ -680,12 +680,11 @@ export class BaseTelegramClient { phone: null, type: peer.broadcast ? 'channel' : 'supergroup', updated: 0, + fromMessage: peer.fromMessage }) } } await this.storage.updatePeers(parsedPeers) - - return isMin } } diff --git a/packages/core/src/storage/abstract.ts b/packages/core/src/storage/abstract.ts index 5ff8c49e..f8bb89db 100644 --- a/packages/core/src/storage/abstract.ts +++ b/packages/core/src/storage/abstract.ts @@ -11,37 +11,14 @@ export namespace ITelegramStorage { username: string | null phone: string | null updated: number + // marked peer id of chat, message id + fromMessage?: [number, number] } export interface SelfInfo { isBot: boolean userId: number } - - export function getInputPeer(peerInfo?: PeerInfo): tl.TypeInputPeer | null { - if (!peerInfo) return null - if (peerInfo.type === 'user' || peerInfo.type === 'bot') - return { - _: 'inputPeerUser', - userId: peerInfo.id, - accessHash: peerInfo.accessHash, - } - - if (peerInfo.type === 'group') - return { - _: 'inputPeerChat', - chatId: -peerInfo.id, - } - - if (peerInfo.type === 'channel' || peerInfo.type === 'supergroup') - return { - _: 'inputPeerChannel', - channelId: MAX_CHANNEL_ID - peerInfo.id, - accessHash: peerInfo.accessHash, - } - - throw new Error(`Invalid peer type: ${peerInfo.type}`) - } } /** @@ -135,15 +112,14 @@ export interface ITelegramStorage { getChannelPts(entityId: number): MaybeAsync /** * Set common `pts` and `date` values - * - * `null` values in the tuple are replaced with the current value, - * `null` as a `val` will remove common pts */ - setCommonPts(val: [number | null, number | null] | null): MaybeAsync + setCommonPts(val: [number, number]): MaybeAsync /** - * Set channel `pts` value + * Set channels `pts` values in batch. + * Storage is supposed to replace stored channel `pts` values + * with given in the object (key is unmarked peer id, value is the `pts`) */ - setChannelPts(entityId: number, pts: number | null): MaybeAsync + setManyChannelPts(values: Record): MaybeAsync // TODO! // exportToString(): MaybeAsync diff --git a/packages/core/src/storage/memory.ts b/packages/core/src/storage/memory.ts index 3cdde585..fad201ca 100644 --- a/packages/core/src/storage/memory.ts +++ b/packages/core/src/storage/memory.ts @@ -1,6 +1,7 @@ import { ITelegramStorage } from './abstract' import { MaybeAsync } from '../types' import { tl } from '@mtcute/tl' +import { MAX_CHANNEL_ID } from '../utils/peer-utils' const CURRENT_VERSION = 1 @@ -18,7 +19,9 @@ interface MemorySessionState { // username -> peer id usernameIndex: Record + // common pts, date gpts: [number, number] | null + // channel pts pts: Record self: ITelegramStorage.SelfInfo | null @@ -99,6 +102,9 @@ export class MemoryStorage implements ITelegramStorage { peer.updated = Date.now() const old = this._state.entities[peer.id] if (old) { + // min peer + if (peer.fromMessage) continue + // delete old index entries if needed if (old.username && old.username !== peer.username) { delete this._state.usernameIndex[old.username] @@ -115,16 +121,60 @@ export class MemoryStorage implements ITelegramStorage { } } + protected _getInputPeer(peerInfo?: ITelegramStorage.PeerInfo): tl.TypeInputPeer | null { + if (!peerInfo) return null + if (peerInfo.type === 'user' || peerInfo.type === 'bot') { + if (peerInfo.fromMessage) { + return { + _: 'inputPeerUserFromMessage', + peer: this.getPeerById(peerInfo.fromMessage[0])!, + msgId: peerInfo.fromMessage[1], + userId: peerInfo.id + } + } + return { + _: 'inputPeerUser', + userId: peerInfo.id, + accessHash: peerInfo.accessHash, + } + } + + if (peerInfo.type === 'group') + return { + _: 'inputPeerChat', + chatId: -peerInfo.id, + } + + if (peerInfo.type === 'channel' || peerInfo.type === 'supergroup') { + if (peerInfo.fromMessage) { + return { + _: 'inputPeerChannelFromMessage', + peer: this.getPeerById(peerInfo.fromMessage[0])!, + msgId: peerInfo.fromMessage[1], + channelId: peerInfo.id + } + } + + return { + _: 'inputPeerChannel', + channelId: MAX_CHANNEL_ID - peerInfo.id, + accessHash: peerInfo.accessHash, + } + } + + throw new Error(`Invalid peer type: ${peerInfo.type}`) + } + getPeerById(peerId: number): tl.TypeInputPeer | null { if (peerId in this._cachedInputPeers) return this._cachedInputPeers[peerId] - const peer = ITelegramStorage.getInputPeer(this._state.entities[peerId]) + const peer = this._getInputPeer(this._state.entities[peerId]) if (peer) this._cachedInputPeers[peerId] = peer return peer } getPeerByPhone(phone: string): tl.TypeInputPeer | null { - return ITelegramStorage.getInputPeer( + return this._getInputPeer( this._state.entities[this._state.phoneIndex[phone]] ) } @@ -137,7 +187,7 @@ export class MemoryStorage implements ITelegramStorage { if (Date.now() - peer.updated > USERNAME_TTL) return null - return ITelegramStorage.getInputPeer(peer) + return this._getInputPeer(peer) } getSelf(): ITelegramStorage.SelfInfo | null { @@ -148,28 +198,18 @@ export class MemoryStorage implements ITelegramStorage { this._state.self = self } - setChannelPts(entityId: number, pts: number | null): void { - if (pts !== null) { - this._state.pts[entityId] = pts - } else { - delete this._state.pts[entityId] - } + setManyChannelPts(values: Record): void { + Object.keys(values).forEach((id: any) => { + this._state.pts[id] = values[id] + }) } getChannelPts(entityId: number): number | null { return this._state.pts[entityId] ?? null } - setCommonPts(val: [number | null, number | null] | null): void { - if (val) { - if (this._state.gpts) { - if (val[0] === null) val[0] = this._state.gpts[0] - if (val[1] === null) val[1] = this._state.gpts[1] - } else { - val = null - } - } - this._state.gpts = val as [number, number] | null + setCommonPts(val: [number, number]): void { + this._state.gpts = val } getCommonPts(): [number, number] | null { diff --git a/packages/core/src/utils/peer-utils.ts b/packages/core/src/utils/peer-utils.ts index cd4ecabd..78813fbc 100644 --- a/packages/core/src/utils/peer-utils.ts +++ b/packages/core/src/utils/peer-utils.ts @@ -1,5 +1,5 @@ import { tl } from '@mtcute/tl' -import { BasicPeerType, MaybeArray, PeerType } from '../types' +import { BasicPeerType, PeerType } from '../types' export const MIN_CHANNEL_ID = -1002147483647 export const MAX_CHANNEL_ID = -1000000000000 @@ -92,72 +92,167 @@ export function peerTypeToBasic(type: PeerType): BasicPeerType { throw new Error('Invalid peer type') } +function comparePeers( + first: tl.TypePeer | undefined, + second: tl.TypePeer | tl.TypeUser | tl.TypeChat +): boolean { + if (!first) return false + + if ('userId' in first) { + if ('userId' in second) return first.userId === second.userId + if (second._ === 'user' || second._ === 'userEmpty') + return first.userId === second.id + } + if ('chatId' in first) { + if ('chatId' in second) return first.chatId === second.chatId + if ( + second._ === 'chat' || + second._ === 'chatForbidden' || + second._ === 'chatEmpty' + ) + return first.chatId === second.id + } + if ('channelId' in first) { + if ('channelId' in second) return first.channelId === second.channelId + if (second._ === 'channel' || second._ === 'channelForbidden') + return first.channelId === second.id + } + return false +} + +function isRefMessage(msg: tl.TypeMessage, peer: any): boolean | undefined { + return ( + comparePeers(msg.peerId, peer) || + ('fromId' in msg && comparePeers(msg.fromId, peer)) || + ('fwdFrom' in msg && msg.fwdFrom && comparePeers(msg.fwdFrom.fromId, peer)) + ) +} + +function findContext(obj: any, peer: any): [number, number] | undefined { + if (!peer.min) return undefined + if (obj._ === 'updates' || obj._ === 'updatesCombined') { + for (const upd of obj.updates as tl.TypeUpdate[]) { + if ( + (upd._ === 'updateNewMessage' || + upd._ === 'updateNewChannelMessage' || + upd._ === 'updateEditMessage' || + upd._ === 'updateEditChannelMessage') && + isRefMessage(upd.message, peer) + ) { + return [getMarkedPeerId(upd.message.peerId!), upd.message.id] + } + } + } + + if (obj._ === 'updateShortMessage') { + return [obj.userId, obj.id] + } + + if (obj._ === 'updateShortChatMessage') { + return [-obj.chatId, obj.id] + } + + if ('messages' in obj || 'newMessages' in obj) { + for (const msg of (obj.messages || + obj.newMessages) as tl.TypeMessage[]) { + if (isRefMessage(msg, peer)) { + return [getMarkedPeerId(msg.peerId!), msg.id] + } + } + } + + // im not sure if this is exhaustive check or not + + return undefined +} + /** * Extracts all (cacheable) entities from a TlObject or a list of them. * Only checks `.user`, `.chat`, `.channel`, `.users` and `.chats` properties */ export function* getAllPeersFrom( - objects: MaybeArray + obj: any ): Iterable< - | tl.RawUser - | tl.RawChat - | tl.RawChatForbidden - | tl.RawChannel - | tl.RawChannelForbidden + (tl.TypeUser | tl.TypeChat) & { fromMessage: [number, number] | undefined } > { - if (!Array.isArray(objects)) objects = [objects] + if (typeof obj !== 'object') return - for (const obj of objects) { - if (typeof obj !== 'object') continue + if ( + obj._ === 'user' || + obj._ === 'chat' || + obj._ === 'channel' || + obj._ === 'chatForbidden' || + obj._ === 'channelForbidden' + ) { + yield obj + return + } + if (obj._ === 'userFull') { + yield obj.user + return + } - if ( - 'user' in obj && - typeof obj.user === 'object' && - obj.user._ === 'user' - ) { - yield obj.user - } + if ( + 'user' in obj && + typeof obj.user === 'object' && + obj.user._ === 'user' + ) { + yield obj.user + } - if ( - 'chat' in obj && - typeof obj.chat === 'object' && - (obj.chat._ === 'chat' || - obj.chat._ === 'channel' || - obj.chat._ === 'chatForbidden' || - obj.chat._ === 'channelForbidden') - ) { - yield obj.chat - } + if ( + 'chat' in obj && + typeof obj.chat === 'object' && + (obj.chat._ === 'chat' || + obj.chat._ === 'channel' || + obj.chat._ === 'chatForbidden' || + obj.chat._ === 'channelForbidden') + ) { + yield obj.chat + } - if ( - 'channel' in obj && - typeof obj.channel === 'object' && - (obj.channel._ === 'chat' || - obj.channel._ === 'channel' || - obj.channel._ === 'chatForbidden' || - obj.channel._ === 'channelForbidden') - ) { - yield obj.channel - } + if ( + 'channel' in obj && + typeof obj.channel === 'object' && + (obj.channel._ === 'chat' || + obj.channel._ === 'channel' || + obj.channel._ === 'chatForbidden' || + obj.channel._ === 'channelForbidden') + ) { + yield obj.channel + } - if ('users' in obj && Array.isArray(obj.users) && obj.users.length) { - for (const user of obj.users) { - // .users is sometimes number[] - if (typeof user === 'object' && user._ === 'user') yield user + if ('users' in obj && Array.isArray(obj.users) && obj.users.length) { + for (const user of obj.users) { + // .users is sometimes number[] + if (typeof user === 'object' && user._ === 'user') { + if (user.min && !user.bot) { + // min seems to be set for @Channel_Bot, + // but we don't really need to cache its context + // (we don't need to cache it at all, really, but whatever) + user.fromMessage = findContext(obj, user) + } + + yield user } } + } - if ('chats' in obj && Array.isArray(obj.chats) && obj.chats.length) { - for (const chat of obj.chats) { - // .chats is sometimes number[] - if ( - typeof chat === 'object' && - (chat._ === 'chat' || - chat._ === 'channel' || - chat._ === 'chatForbidden' || - chat._ === 'channelForbidden') - ) - yield chat + if ('chats' in obj && Array.isArray(obj.chats) && obj.chats.length) { + for (const chat of obj.chats) { + // .chats is sometimes number[] + if ( + typeof chat === 'object' && + (chat._ === 'chat' || + chat._ === 'channel' || + chat._ === 'chatForbidden' || + chat._ === 'channelForbidden') + ) { + if (chat.min) { + chat.fromMessage = findContext(obj, chat) + } + + yield chat } } }