rewrite(client): improved updates handling, added support for min entities
This commit is contained in:
parent
bcc5321cee
commit
87481c9a32
21 changed files with 879 additions and 422 deletions
|
@ -71,13 +71,18 @@ import {
|
|||
setDefaultParseMode,
|
||||
unregisterParseMode,
|
||||
} from './methods/parse-modes/parse-modes'
|
||||
import { catchUp } from './methods/updates/catch-up'
|
||||
import {
|
||||
_dispatchUpdate,
|
||||
addUpdateHandler,
|
||||
removeUpdateHandler,
|
||||
} from './methods/updates/dispatcher'
|
||||
import { _handleUpdate } from './methods/updates/handle-update'
|
||||
import {
|
||||
_fetchUpdatesState,
|
||||
_handleUpdate,
|
||||
_loadStorage,
|
||||
_saveStorage,
|
||||
catchUp,
|
||||
} from './methods/updates/handle-update'
|
||||
import { onNewMessage } from './methods/updates/on-new-message'
|
||||
import { blockUser } from './methods/users/block-user'
|
||||
import { getCommonChats } from './methods/users/get-common-chats'
|
||||
|
@ -112,8 +117,15 @@ import {
|
|||
handlers,
|
||||
} from './types'
|
||||
import { MaybeArray, MaybeAsync, TelegramConnection } from '@mtcute/core'
|
||||
import { Lock } from './utils/lock'
|
||||
|
||||
export class TelegramClient extends BaseTelegramClient {
|
||||
// from methods/auth/_initialize.ts
|
||||
protected _userId: number | null
|
||||
|
||||
// from methods/auth/_initialize.ts
|
||||
protected _isBot: boolean
|
||||
|
||||
// from methods/files/_initialize.ts
|
||||
protected _downloadConnections: Record<number, TelegramConnection>
|
||||
|
||||
|
@ -129,13 +141,35 @@ export class TelegramClient extends BaseTelegramClient {
|
|||
// from methods/updates/dispatcher.ts
|
||||
protected _groupsOrder: number[]
|
||||
|
||||
// from methods/updates/handle-update.ts
|
||||
protected _updLock: Lock
|
||||
|
||||
// from methods/updates/handle-update.ts
|
||||
protected _pts: number
|
||||
|
||||
// from methods/updates/handle-update.ts
|
||||
protected _date: number
|
||||
|
||||
// from methods/updates/handle-update.ts
|
||||
protected _cpts: Record<number, number>
|
||||
|
||||
constructor(opts: BaseTelegramClient.Options) {
|
||||
super(opts)
|
||||
this._userId = null
|
||||
this._isBot = false
|
||||
this._downloadConnections = {}
|
||||
this._parseModes = {}
|
||||
this._defaultParseMode = null
|
||||
this._groups = {}
|
||||
this._groupsOrder = []
|
||||
this._updLock = new Lock()
|
||||
// we dont need to initialize state fields since
|
||||
// they are always loaded either from the server, or from storage.
|
||||
|
||||
// channel PTS are not loaded immediately, and instead are cached here
|
||||
// after the first time they were retrieved from the storage.
|
||||
// they are later pushed into the storage.
|
||||
this._cpts = {}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1714,18 +1748,11 @@ export class TelegramClient extends BaseTelegramClient {
|
|||
setDefaultParseMode(name: string): void {
|
||||
return setDefaultParseMode.apply(this, arguments)
|
||||
}
|
||||
/**
|
||||
* Catch up with the server by loading missed updates.
|
||||
*
|
||||
*/
|
||||
catchUp(): Promise<void> {
|
||||
return catchUp.apply(this, arguments)
|
||||
}
|
||||
protected _dispatchUpdate(
|
||||
update: tl.TypeUpdate,
|
||||
update: tl.TypeUpdate | tl.TypeMessage,
|
||||
users: Record<number, tl.TypeUser>,
|
||||
chats: Record<number, tl.TypeChat>
|
||||
): Promise<void> {
|
||||
): void {
|
||||
return _dispatchUpdate.apply(this, arguments)
|
||||
}
|
||||
/**
|
||||
|
@ -1750,9 +1777,30 @@ export class TelegramClient extends BaseTelegramClient {
|
|||
): void {
|
||||
return removeUpdateHandler.apply(this, arguments)
|
||||
}
|
||||
/**
|
||||
* Fetch updates state from the server.
|
||||
* Meant to be used right after authorization,
|
||||
* but before force-saving the session.
|
||||
*/
|
||||
protected _fetchUpdatesState(): Promise<void> {
|
||||
return _fetchUpdatesState.apply(this, arguments)
|
||||
}
|
||||
protected _loadStorage(): Promise<void> {
|
||||
return _loadStorage.apply(this, arguments)
|
||||
}
|
||||
protected _saveStorage(): Promise<void> {
|
||||
return _saveStorage.apply(this, arguments)
|
||||
}
|
||||
protected _handleUpdate(update: tl.TypeUpdates): void {
|
||||
return _handleUpdate.apply(this, arguments)
|
||||
}
|
||||
/**
|
||||
* Catch up with the server by loading missed updates.
|
||||
*
|
||||
*/
|
||||
catchUp(): Promise<void> {
|
||||
return catchUp.apply(this, arguments)
|
||||
}
|
||||
/**
|
||||
* Register a message handler without any filters.
|
||||
*
|
||||
|
|
|
@ -35,3 +35,6 @@ import {
|
|||
|
||||
// @copy
|
||||
import { MaybeArray, MaybeAsync, TelegramConnection } from '@mtcute/core'
|
||||
|
||||
// @copy
|
||||
import { Lock } from '../utils/lock'
|
||||
|
|
17
packages/client/src/methods/auth/_initialize.ts
Normal file
17
packages/client/src/methods/auth/_initialize.ts
Normal file
|
@ -0,0 +1,17 @@
|
|||
import { TelegramClient } from '../../client'
|
||||
|
||||
// @extension
|
||||
interface AuthState {
|
||||
// local copy of "self" in storage,
|
||||
// so we can use it w/out relying on storage.
|
||||
// they are both loaded and saved to storage along with the updates
|
||||
// (see methods/updates/handle-update)
|
||||
_userId: number | null
|
||||
_isBot: boolean
|
||||
}
|
||||
|
||||
// @initialize
|
||||
function _initializeAuthState(this: TelegramClient) {
|
||||
this._userId = null
|
||||
this._isBot = false
|
||||
}
|
|
@ -37,10 +37,9 @@ export async function checkPassword(
|
|||
'user'
|
||||
)
|
||||
|
||||
await this.storage.setSelf({
|
||||
userId: res.user.id,
|
||||
isBot: false,
|
||||
})
|
||||
this._userId = res.user.id
|
||||
this._isBot = false
|
||||
await this._fetchUpdatesState()
|
||||
await this._saveStorage()
|
||||
|
||||
return new User(this, res.user)
|
||||
|
|
|
@ -17,6 +17,9 @@ export async function logOut(
|
|||
await this.call({ _: 'auth.logOut' })
|
||||
|
||||
if (resetSession) {
|
||||
this._userId = null
|
||||
this._isBot = false
|
||||
this._pts = this._seq = this._date = undefined as any
|
||||
this.storage.reset()
|
||||
await this._saveStorage()
|
||||
}
|
||||
|
|
|
@ -30,10 +30,8 @@ export async function recoverPassword(
|
|||
'user'
|
||||
)
|
||||
|
||||
await this.storage.setSelf({
|
||||
userId: res.user.id,
|
||||
isBot: false,
|
||||
})
|
||||
this._userId = res.user.id
|
||||
this._isBot = false
|
||||
await this._saveStorage()
|
||||
|
||||
return new User(this, res.user)
|
||||
|
|
|
@ -33,10 +33,9 @@ export async function signInBot(
|
|||
'user'
|
||||
)
|
||||
|
||||
await this.storage.setSelf({
|
||||
userId: res.user.id,
|
||||
isBot: true,
|
||||
})
|
||||
this._userId = res.user.id
|
||||
this._isBot = true
|
||||
await this._fetchUpdatesState()
|
||||
await this._saveStorage()
|
||||
|
||||
return new User(this, res.user)
|
||||
|
|
|
@ -41,10 +41,9 @@ export async function signIn(
|
|||
|
||||
assertTypeIs('signIn (@ auth.signIn -> user)', res.user, 'user')
|
||||
|
||||
await this.storage.setSelf({
|
||||
userId: res.user.id,
|
||||
isBot: false,
|
||||
})
|
||||
this._userId = res.user.id
|
||||
this._isBot = false
|
||||
await this._fetchUpdatesState()
|
||||
await this._saveStorage()
|
||||
|
||||
return new User(this, res.user)
|
||||
|
|
|
@ -32,10 +32,9 @@ export async function signUp(
|
|||
assertTypeIs('signUp (@ auth.signUp)', res, 'auth.authorization')
|
||||
assertTypeIs('signUp (@ auth.signUp -> user)', res.user, 'user')
|
||||
|
||||
await this.storage.setSelf({
|
||||
userId: res.user.id,
|
||||
isBot: false,
|
||||
})
|
||||
this._userId = res.user.id
|
||||
this._isBot = false
|
||||
await this._fetchUpdatesState()
|
||||
await this._saveStorage()
|
||||
|
||||
return new User(this, res.user)
|
||||
|
|
|
@ -222,7 +222,7 @@ export async function start(
|
|||
|
||||
if (e instanceof PasswordHashInvalidError) {
|
||||
if (params.invalidCodeCallback) {
|
||||
params.invalidCodeCallback('password')
|
||||
await params.invalidCodeCallback('password')
|
||||
} else {
|
||||
console.log('Invalid password. Please try again')
|
||||
}
|
||||
|
@ -245,7 +245,7 @@ export async function start(
|
|||
|
||||
if (result instanceof TermsOfService && !params.acceptTos) {
|
||||
if (params.tosCallback) {
|
||||
params.tosCallback(result)
|
||||
await params.tosCallback(result)
|
||||
} else {
|
||||
console.log(result.text)
|
||||
}
|
||||
|
|
|
@ -1,83 +0,0 @@
|
|||
import { TelegramClient } from '../../client'
|
||||
import { tl } from '@mtcute/tl'
|
||||
|
||||
const debug = require('debug')('mtcute:upds')
|
||||
|
||||
/**
|
||||
* Catch up with the server by loading missed updates.
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
export async function catchUp(this: TelegramClient): Promise<void> {
|
||||
// this doesn't work with missed channel updates properly
|
||||
// todo: fix
|
||||
const state = await this.storage.getCommonPts()
|
||||
if (!state) return
|
||||
|
||||
let [pts, date] = state
|
||||
|
||||
let error: Error | null = null
|
||||
try {
|
||||
for (;;) {
|
||||
const diff = await this.call({
|
||||
_: 'updates.getDifference',
|
||||
pts,
|
||||
date,
|
||||
qts: 0,
|
||||
})
|
||||
|
||||
if (
|
||||
diff._ === 'updates.difference' ||
|
||||
diff._ === 'updates.differenceSlice'
|
||||
) {
|
||||
const state =
|
||||
diff._ === 'updates.difference'
|
||||
? diff.state
|
||||
: diff.intermediateState
|
||||
pts = state.pts
|
||||
date = state.date
|
||||
|
||||
this._handleUpdate({
|
||||
_: 'updates',
|
||||
users: diff.users,
|
||||
chats: diff.chats,
|
||||
date: state.date,
|
||||
seq: state.seq,
|
||||
updates: [
|
||||
...diff.otherUpdates,
|
||||
...diff.newMessages.map(
|
||||
(m) =>
|
||||
({
|
||||
_: 'updateNewMessage',
|
||||
message: m,
|
||||
pts: 0,
|
||||
ptsCount: 0,
|
||||
} as tl.RawUpdateNewMessage)
|
||||
),
|
||||
],
|
||||
})
|
||||
|
||||
debug(
|
||||
'catching up... processed %d updates and %d messages',
|
||||
diff.otherUpdates.length,
|
||||
diff.newMessages.length
|
||||
)
|
||||
} else {
|
||||
if (diff._ === 'updates.differenceEmpty') {
|
||||
date = diff.date
|
||||
} else if (diff._ === 'updates.differenceTooLong') {
|
||||
pts = diff.pts
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
error = e
|
||||
debug('error while catching up: ' + error)
|
||||
}
|
||||
|
||||
debug('caught up')
|
||||
|
||||
await this.storage.setCommonPts([pts, date])
|
||||
await this.storage.save?.()
|
||||
}
|
|
@ -7,6 +7,7 @@ import {
|
|||
PropagationSymbol,
|
||||
StopPropagation,
|
||||
} from '../../types'
|
||||
import { createUsersChatsIndex } from '../../utils/peer-utils'
|
||||
|
||||
// @extension
|
||||
interface DispatcherExtension {
|
||||
|
@ -23,47 +24,71 @@ function _initializeDispatcher() {
|
|||
/**
|
||||
* @internal
|
||||
*/
|
||||
export async function _dispatchUpdate(
|
||||
export function _dispatchUpdate(
|
||||
this: TelegramClient,
|
||||
update: tl.TypeUpdate,
|
||||
update: tl.TypeUpdate | tl.TypeMessage,
|
||||
users: Record<number, tl.TypeUser>,
|
||||
chats: Record<number, tl.TypeChat>
|
||||
): Promise<void> {
|
||||
let message: Message | null = null
|
||||
if (
|
||||
update._ === 'updateNewMessage' ||
|
||||
update._ === 'updateNewChannelMessage' ||
|
||||
update._ === 'updateNewScheduledMessage' ||
|
||||
update._ === 'updateEditMessage' ||
|
||||
update._ === 'updateEditChannelMessage'
|
||||
) {
|
||||
message = new Message(this, update.message, users, chats)
|
||||
}
|
||||
): void {
|
||||
;(async () => {
|
||||
// order does not matter in the dispatcher,
|
||||
// so we can handle each update in its own task
|
||||
|
||||
for (const grp of this._groupsOrder) {
|
||||
for (const handler of this._groups[grp]) {
|
||||
let result: void | PropagationSymbol
|
||||
const isRawMessage = tl.isAnyMessage(update)
|
||||
|
||||
if (
|
||||
handler.type === 'raw' &&
|
||||
(!handler.check ||
|
||||
(await handler.check(this, update, users, chats)))
|
||||
) {
|
||||
result = await handler.callback(this, update, users, chats)
|
||||
} else if (
|
||||
handler.type === 'new_message' &&
|
||||
message &&
|
||||
(!handler.check || (await handler.check(message, this)))
|
||||
) {
|
||||
result = await handler.callback(message, this)
|
||||
} else continue
|
||||
|
||||
if (result === ContinuePropagation) continue
|
||||
if (result === StopPropagation) return
|
||||
|
||||
break
|
||||
let message: Message | null = null
|
||||
if (
|
||||
update._ === 'updateNewMessage' ||
|
||||
update._ === 'updateNewChannelMessage' ||
|
||||
update._ === 'updateNewScheduledMessage' ||
|
||||
update._ === 'updateEditMessage' ||
|
||||
update._ === 'updateEditChannelMessage' ||
|
||||
isRawMessage
|
||||
) {
|
||||
message = new Message(
|
||||
this,
|
||||
isRawMessage ? update : (update as any).message,
|
||||
users,
|
||||
chats
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
for (const grp of this._groupsOrder) {
|
||||
for (const handler of this._groups[grp]) {
|
||||
let result: void | PropagationSymbol
|
||||
|
||||
if (
|
||||
handler.type === 'raw' &&
|
||||
!isRawMessage &&
|
||||
(!handler.check ||
|
||||
(await handler.check(
|
||||
this,
|
||||
update as any,
|
||||
users,
|
||||
chats
|
||||
)))
|
||||
) {
|
||||
result = await handler.callback(
|
||||
this,
|
||||
update as any,
|
||||
users,
|
||||
chats
|
||||
)
|
||||
} else if (
|
||||
handler.type === 'new_message' &&
|
||||
message &&
|
||||
(!handler.check || (await handler.check(message, this)))
|
||||
) {
|
||||
result = await handler.callback(message, this)
|
||||
} else continue
|
||||
|
||||
if (result === ContinuePropagation) continue
|
||||
if (result === StopPropagation) return
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
})().catch((err) => this._emitError(err))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,12 +1,199 @@
|
|||
import { tl } from '@mtcute/tl'
|
||||
import { TelegramClient } from '../../client'
|
||||
import { ChannelPrivateError } from '@mtcute/tl/errors'
|
||||
import { MAX_CHANNEL_ID } from '@mtcute/core'
|
||||
import { createUsersChatsIndex, normalizeToInputChannel } from '../../utils/peer-utils'
|
||||
import {
|
||||
createUsersChatsIndex,
|
||||
normalizeToInputChannel,
|
||||
normalizeToInputUser,
|
||||
peerToInputPeer,
|
||||
} from '../../utils/peer-utils'
|
||||
import { extractChannelIdFromUpdate } from '../../utils/misc-utils'
|
||||
import { Lock } from '../../utils/lock'
|
||||
import bigInt from 'big-integer'
|
||||
import { MAX_CHANNEL_ID } from '../../../../core'
|
||||
|
||||
const debug = require('debug')('mtcute:upds')
|
||||
|
||||
// i tried to implement updates seq, but that thing seems to be
|
||||
// broken on the server side, lol (see https://t.me/teispam/1199, ru)
|
||||
// tldr server sends multiple `updates` with the same seq, and that seq
|
||||
// is also larger than the seq in the initial updates.getState response
|
||||
|
||||
// @extension
|
||||
interface UpdatesState {
|
||||
_updLock: Lock
|
||||
|
||||
// accessing storage every time might be expensive,
|
||||
// so store everything here, and load & save
|
||||
// every time session is loaded & saved.
|
||||
_pts: number
|
||||
_date: number
|
||||
// _seq: number
|
||||
_cpts: Record<number, number>
|
||||
}
|
||||
|
||||
// @initialize
|
||||
function _initializeUpdates(this: TelegramClient) {
|
||||
this._updLock = new Lock()
|
||||
// we dont need to initialize state fields since
|
||||
// they are always loaded either from the server, or from storage.
|
||||
|
||||
// channel PTS are not loaded immediately, and instead are cached here
|
||||
// after the first time they were retrieved from the storage.
|
||||
// they are later pushed into the storage.
|
||||
this._cpts = {}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch updates state from the server.
|
||||
* Meant to be used right after authorization,
|
||||
* but before force-saving the session.
|
||||
* @internal
|
||||
*/
|
||||
export async function _fetchUpdatesState(this: TelegramClient): Promise<void> {
|
||||
const state = await this.call({ _: 'updates.getState' })
|
||||
this._pts = state.pts
|
||||
this._date = state.date
|
||||
// this._seq = state.seq
|
||||
debug(
|
||||
'loaded initial state: pts=%d, date=%d', // , seq=%d',
|
||||
state.pts,
|
||||
state.date
|
||||
// state.seq
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
export async function _loadStorage(this: TelegramClient): Promise<void> {
|
||||
// load updates state from the session
|
||||
await this.storage.load?.()
|
||||
const state = await this.storage.getCommonPts()
|
||||
if (state) {
|
||||
this._pts = state[0]
|
||||
this._date = state[1]
|
||||
// this._seq = state[2]
|
||||
}
|
||||
// if no state, don't bother initializing properties
|
||||
// since that means that there is no authorization,
|
||||
// and thus _fetchUpdatesState will be called
|
||||
|
||||
const self = await this.storage.getSelf()
|
||||
if (self) {
|
||||
this._userId = self.userId
|
||||
this._isBot = self.isBot
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
export async function _saveStorage(this: TelegramClient): Promise<void> {
|
||||
// save updates state to the session
|
||||
|
||||
// before any authorization pts will be undefined
|
||||
if (this._pts !== undefined) {
|
||||
await this.storage.setCommonPts([this._pts, this._date]) // , this._seq])
|
||||
await this.storage.setManyChannelPts(this._cpts)
|
||||
}
|
||||
if (this._userId !== null) {
|
||||
await this.storage.setSelf({
|
||||
userId: this._userId,
|
||||
isBot: this._isBot,
|
||||
})
|
||||
}
|
||||
|
||||
await this.storage.save?.()
|
||||
}
|
||||
|
||||
async function _loadDifference(this: TelegramClient): Promise<void> {
|
||||
for (;;) {
|
||||
const diff = await this.call({
|
||||
_: 'updates.getDifference',
|
||||
pts: this._pts,
|
||||
date: this._date,
|
||||
qts: 0,
|
||||
})
|
||||
|
||||
if (
|
||||
diff._ === 'updates.differenceEmpty' ||
|
||||
diff._ === 'updates.differenceTooLong'
|
||||
)
|
||||
return
|
||||
|
||||
const state =
|
||||
diff._ === 'updates.difference'
|
||||
? diff.state
|
||||
: diff.intermediateState
|
||||
|
||||
await this._cachePeersFrom(diff)
|
||||
|
||||
const { users, chats } = createUsersChatsIndex(diff)
|
||||
|
||||
diff.newMessages.forEach((message) =>
|
||||
this._dispatchUpdate(message, users, chats)
|
||||
)
|
||||
diff.otherUpdates.forEach((upd) =>
|
||||
this._dispatchUpdate(upd, users, chats)
|
||||
)
|
||||
|
||||
this._pts = state.pts
|
||||
this._date = state.date
|
||||
|
||||
if (diff._ === 'updates.difference') return
|
||||
}
|
||||
}
|
||||
|
||||
async function _loadChannelDifference(
|
||||
this: TelegramClient,
|
||||
channelId: number
|
||||
): Promise<void> {
|
||||
let channel
|
||||
try {
|
||||
channel = normalizeToInputChannel(
|
||||
await this.resolvePeer(MAX_CHANNEL_ID - channelId)
|
||||
)!
|
||||
} catch (e) {
|
||||
return
|
||||
}
|
||||
|
||||
let pts = this._cpts[channelId]
|
||||
if (!pts) {
|
||||
pts = (await this.storage.getChannelPts(channelId)) ?? 0
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
const diff = await this.call({
|
||||
_: 'updates.getChannelDifference',
|
||||
channel,
|
||||
pts,
|
||||
limit: this._isBot ? 1000 : 100,
|
||||
filter: { _: 'channelMessagesFilterEmpty' },
|
||||
})
|
||||
|
||||
if (
|
||||
diff._ === 'updates.channelDifferenceEmpty' ||
|
||||
diff._ === 'updates.channelDifferenceTooLong'
|
||||
)
|
||||
return
|
||||
|
||||
await this._cachePeersFrom(diff)
|
||||
|
||||
const { users, chats } = createUsersChatsIndex(diff)
|
||||
|
||||
diff.newMessages.forEach((message) =>
|
||||
this._dispatchUpdate(message, users, chats)
|
||||
)
|
||||
diff.otherUpdates.forEach((upd) =>
|
||||
this._dispatchUpdate(upd, users, chats)
|
||||
)
|
||||
|
||||
pts = diff.pts
|
||||
|
||||
if (diff.final) break
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
|
@ -14,177 +201,287 @@ export function _handleUpdate(
|
|||
this: TelegramClient,
|
||||
update: tl.TypeUpdates
|
||||
): void {
|
||||
;(async () => {
|
||||
debug('received %s', update._)
|
||||
// just in case, check that updates state is available
|
||||
if (this._pts === undefined) {
|
||||
debug('received an update before updates state is available')
|
||||
return
|
||||
}
|
||||
|
||||
// https://github.com/pyrogram/pyrogram/blob/a86656aefcc93cc3d2f5c98227d5da28fcddb136/pyrogram/client.py#L521
|
||||
if (update._ === 'updates' || update._ === 'updatesCombined') {
|
||||
const isMin = await this._cachePeersFrom(update)
|
||||
// we want to process updates in order, so we use a lock
|
||||
// it is *very* important that the lock is released, otherwise
|
||||
// the incoming updates will be stuck forever, eventually killing the process with OOM
|
||||
// thus, we wrap everything in what basically is a try..finally
|
||||
|
||||
const { users, chats } = createUsersChatsIndex(update)
|
||||
// additionally, locking here blocks updates handling while we are
|
||||
// loading difference inside update handler.
|
||||
|
||||
for (const upd of update.updates) {
|
||||
if (upd._ === 'updateChannelTooLong') {
|
||||
// what are we supposed to do with this?
|
||||
debug(
|
||||
'received updateChannelTooLong for channel %d (pts %d)',
|
||||
upd.channelId,
|
||||
upd.pts
|
||||
)
|
||||
continue
|
||||
}
|
||||
this._updLock
|
||||
.acquire()
|
||||
.then(async () => {
|
||||
debug('received %s', update._)
|
||||
|
||||
const channelId = extractChannelIdFromUpdate(upd)
|
||||
const pts = 'pts' in upd ? upd.pts : undefined
|
||||
const ptsCount = 'ptsCount' in upd ? upd.ptsCount : undefined
|
||||
const date = 'date' in upd ? upd.date : undefined
|
||||
// i tried my best to follow the documentation, but i still may have missed something.
|
||||
// feel free to contribute!
|
||||
// reference: https://core.telegram.org/api/updates
|
||||
if (update._ === 'updatesTooLong') {
|
||||
// "there are too many events pending to be pushed to the client", we need to fetch them manually
|
||||
await _loadDifference.call(this)
|
||||
} else if (
|
||||
update._ === 'updates' ||
|
||||
update._ === 'updatesCombined'
|
||||
) {
|
||||
// const seqStart =
|
||||
// update._ === 'updatesCombined'
|
||||
// ? update.seqStart
|
||||
// : update.seq
|
||||
// const nextLocalSeq = this._seq + 1
|
||||
//
|
||||
// debug('received %s (seq_start=%d, seq_end=%d)', update._, seqStart, update.seq)
|
||||
//
|
||||
// if (nextLocalSeq > seqStart)
|
||||
// // "the updates were already applied, and must be ignored"
|
||||
// return
|
||||
// if (nextLocalSeq < seqStart)
|
||||
// // "there's an updates gap that must be filled"
|
||||
// // loading difference will also load any updates contained
|
||||
// // in this update, so we discard it
|
||||
// return await _loadDifference.call(this)
|
||||
|
||||
if (upd._ === 'updateNewChannelMessage' && isMin) {
|
||||
// min entities are useless, so we need to fetch actual entities
|
||||
const msg = upd.message
|
||||
await this._cachePeersFrom(update)
|
||||
const { users, chats } = createUsersChatsIndex(update)
|
||||
|
||||
if (msg._ !== 'messageEmpty') {
|
||||
let diff:
|
||||
| tl.RpcCallReturn['updates.getChannelDifference']
|
||||
| null = null
|
||||
for (const upd of update.updates) {
|
||||
if (upd._ === 'updateChannelTooLong') {
|
||||
if (upd.pts) {
|
||||
this._cpts[upd.channelId] = upd.pts
|
||||
}
|
||||
return await _loadChannelDifference.call(this, upd.channelId)
|
||||
}
|
||||
|
||||
const channel = normalizeToInputChannel(
|
||||
await this.resolvePeer(MAX_CHANNEL_ID - channelId!)
|
||||
)
|
||||
if (!channel) return
|
||||
const channelId = extractChannelIdFromUpdate(upd)
|
||||
const pts = 'pts' in upd ? upd.pts : undefined
|
||||
const ptsCount =
|
||||
'ptsCount' in upd ? upd.ptsCount : undefined
|
||||
|
||||
try {
|
||||
diff = await this.call({
|
||||
_: 'updates.getChannelDifference',
|
||||
channel: channel,
|
||||
filter: {
|
||||
_: 'channelMessagesFilter',
|
||||
ranges: [
|
||||
{
|
||||
_: 'messageRange',
|
||||
minId: upd.message.id,
|
||||
maxId: upd.message.id,
|
||||
},
|
||||
],
|
||||
},
|
||||
pts: pts! - ptsCount!,
|
||||
limit: pts!,
|
||||
})
|
||||
} catch (e) {
|
||||
if (!(e instanceof ChannelPrivateError)) throw e
|
||||
if (pts !== undefined && ptsCount !== undefined) {
|
||||
let nextLocalPts
|
||||
if (channelId === undefined)
|
||||
nextLocalPts = this._pts + ptsCount
|
||||
else if (channelId in this._cpts)
|
||||
nextLocalPts = this._cpts[channelId] + ptsCount
|
||||
else {
|
||||
const saved = await this.storage.getChannelPts(
|
||||
channelId
|
||||
)
|
||||
if (saved) {
|
||||
this._cpts[channelId] = saved
|
||||
nextLocalPts = saved + ptsCount
|
||||
} else {
|
||||
nextLocalPts = null
|
||||
}
|
||||
}
|
||||
|
||||
if (
|
||||
diff &&
|
||||
diff._ !== 'updates.channelDifferenceEmpty'
|
||||
) {
|
||||
diff.users.forEach((u) => (users[u.id] = u))
|
||||
diff.chats.forEach((u) => (chats[u.id] = u))
|
||||
if (nextLocalPts) {
|
||||
if (nextLocalPts > pts)
|
||||
// "the update was already applied, and must be ignored"
|
||||
return
|
||||
if (nextLocalPts < pts)
|
||||
// "there's an update gap that must be filled"
|
||||
// same as before, loading diff will also load
|
||||
// any of the pending updates, so we don't need
|
||||
// to bother handling them further.
|
||||
if (channelId) {
|
||||
return await _loadChannelDifference.call(this, channelId)
|
||||
} else {
|
||||
return await _loadDifference.call(this)
|
||||
}
|
||||
}
|
||||
|
||||
this._dispatchUpdate(upd, users, chats)
|
||||
|
||||
if (channelId) {
|
||||
this._cpts[channelId] = pts
|
||||
} else {
|
||||
this._pts = pts
|
||||
}
|
||||
} else {
|
||||
this._dispatchUpdate(upd, users, chats)
|
||||
}
|
||||
}
|
||||
|
||||
if (channelId && pts) {
|
||||
await this.storage.setChannelPts(channelId, pts)
|
||||
}
|
||||
if (!channelId && (pts || date)) {
|
||||
await this.storage.setCommonPts([pts || null, date || null])
|
||||
// this._seq = update.seq
|
||||
this._date = update.date
|
||||
} else if (update._ === 'updateShort') {
|
||||
const upd = update.update
|
||||
if (upd._ === 'updateDcOptions' && this._config) {
|
||||
;(this._config as tl.Mutable<tl.TypeConfig>).dcOptions =
|
||||
upd.dcOptions
|
||||
} else if (upd._ === 'updateConfig') {
|
||||
this._config = await this.call({ _: 'help.getConfig' })
|
||||
} else {
|
||||
this._dispatchUpdate(upd, {}, {})
|
||||
}
|
||||
|
||||
await this._dispatchUpdate(upd, users, chats)
|
||||
}
|
||||
this._date = update.date
|
||||
} else if (update._ === 'updateShortMessage') {
|
||||
const message: tl.RawMessage = {
|
||||
_: 'message',
|
||||
out: update.out,
|
||||
mentioned: update.mentioned,
|
||||
mediaUnread: update.mediaUnread,
|
||||
silent: update.silent,
|
||||
id: update.id,
|
||||
fromId: {
|
||||
_: 'peerUser',
|
||||
userId: update.out ? this._userId! : update.userId,
|
||||
},
|
||||
peerId: {
|
||||
_: 'peerUser',
|
||||
userId: update.userId,
|
||||
},
|
||||
fwdFrom: update.fwdFrom,
|
||||
viaBotId: update.viaBotId,
|
||||
replyTo: update.replyTo,
|
||||
date: update.date,
|
||||
message: update.message,
|
||||
entities: update.entities,
|
||||
ttlPeriod: update.ttlPeriod,
|
||||
}
|
||||
|
||||
await this.storage.setCommonPts([null, update.date])
|
||||
// } else if (update._ === 'updateShortMessage') {
|
||||
// const self = await this.storage.getSelf()
|
||||
//
|
||||
// const message: tl.RawMessage = {
|
||||
// _: 'message',
|
||||
// out: update.out,
|
||||
// mentioned: update.mentioned,
|
||||
// mediaUnread: update.mediaUnread,
|
||||
// silent: update.silent,
|
||||
// id: update.id,
|
||||
// fromId: {
|
||||
// _: 'peerUser',
|
||||
// userId: update.out ? self!.userId : update.userId
|
||||
// },
|
||||
// peerId: {
|
||||
// _: 'peerUser',
|
||||
// userId: update.userId
|
||||
// },
|
||||
// fwdFrom: update.fwdFrom,
|
||||
// viaBotId: update.viaBotId,
|
||||
// replyTo: update.replyTo,
|
||||
// date: update.date,
|
||||
// message: update.message,
|
||||
// entities: update.entities,
|
||||
// ttlPeriod: update.ttlPeriod
|
||||
// }
|
||||
// } else if (update._ === 'updateShortChatMessage') {
|
||||
// const message: tl.RawMessage = {
|
||||
// _: 'message',
|
||||
// out: update.out,
|
||||
// mentioned: update.mentioned,
|
||||
// mediaUnread: update.mediaUnread,
|
||||
// silent: update.silent,
|
||||
// id: update.id,
|
||||
// fromId: {
|
||||
// _: 'peerUser',
|
||||
// userId: update.fromId
|
||||
// },
|
||||
// peerId: {
|
||||
// _: 'peerChat',
|
||||
// chatId: update.chatId
|
||||
// },
|
||||
// fwdFrom: update.fwdFrom,
|
||||
// viaBotId: update.viaBotId,
|
||||
// replyTo: update.replyTo,
|
||||
// date: update.date,
|
||||
// message: update.message,
|
||||
// entities: update.entities,
|
||||
// ttlPeriod: update.ttlPeriod
|
||||
// }
|
||||
//
|
||||
} else if (
|
||||
update._ === 'updateShortMessage' ||
|
||||
update._ === 'updateShortChatMessage'
|
||||
) {
|
||||
await this.storage.setCommonPts([update.pts, update.date])
|
||||
// now we need to fetch info about users involved.
|
||||
// since this update is only used for PM, we can just
|
||||
// fetch the current user and the other user.
|
||||
// additionally, we need to handle "forwarded from"
|
||||
// field, as it may contain a user OR a channel
|
||||
const fwdFrom = update.fwdFrom?.fromId
|
||||
? peerToInputPeer(update.fwdFrom.fromId)
|
||||
: undefined
|
||||
|
||||
// these short updates don't contain users & chats,
|
||||
// so we use updates.getDifference to fetch them
|
||||
// definitely not the best way, but whatever
|
||||
const diff = await this.call({
|
||||
_: 'updates.getDifference',
|
||||
pts: update.pts - update.ptsCount,
|
||||
date: update.date,
|
||||
qts: -1,
|
||||
})
|
||||
|
||||
if (diff._ === 'updates.difference') {
|
||||
if (diff.newMessages.length) {
|
||||
const { users, chats } = createUsersChatsIndex(diff)
|
||||
|
||||
await this._dispatchUpdate(
|
||||
let rawUsers: tl.TypeUser[]
|
||||
{
|
||||
const id: tl.TypeInputUser[] = [
|
||||
{ _: 'inputUserSelf' },
|
||||
{
|
||||
_: 'updateNewMessage',
|
||||
message: diff.newMessages[0],
|
||||
pts: update.pts,
|
||||
ptsCount: update.ptsCount,
|
||||
_: 'inputUser',
|
||||
userId: update.userId,
|
||||
accessHash: bigInt.zero,
|
||||
},
|
||||
users,
|
||||
chats
|
||||
)
|
||||
} else if (diff.otherUpdates.length) {
|
||||
await this._dispatchUpdate(diff.otherUpdates[0], {}, {})
|
||||
]
|
||||
|
||||
if (fwdFrom) {
|
||||
const inputUser = normalizeToInputUser(fwdFrom)
|
||||
if (inputUser) id.push(inputUser)
|
||||
}
|
||||
|
||||
rawUsers = await this.call({
|
||||
_: 'users.getUsers',
|
||||
id,
|
||||
})
|
||||
}
|
||||
let rawChats: tl.TypeChat[] = []
|
||||
if (fwdFrom) {
|
||||
const inputChannel = normalizeToInputChannel(fwdFrom)
|
||||
if (inputChannel)
|
||||
rawChats = await this.call({
|
||||
_: 'channels.getChannels',
|
||||
id: [inputChannel],
|
||||
}).then((res) => res.chats)
|
||||
}
|
||||
|
||||
this._date = update.date
|
||||
|
||||
const { users, chats } = createUsersChatsIndex({
|
||||
users: rawUsers,
|
||||
chats: rawChats,
|
||||
})
|
||||
this._dispatchUpdate(message, users, chats)
|
||||
} else if (update._ === 'updateShortChatMessage') {
|
||||
const message: tl.RawMessage = {
|
||||
_: 'message',
|
||||
out: update.out,
|
||||
mentioned: update.mentioned,
|
||||
mediaUnread: update.mediaUnread,
|
||||
silent: update.silent,
|
||||
id: update.id,
|
||||
fromId: {
|
||||
_: 'peerUser',
|
||||
userId: update.fromId,
|
||||
},
|
||||
peerId: {
|
||||
_: 'peerChat',
|
||||
chatId: update.chatId,
|
||||
},
|
||||
fwdFrom: update.fwdFrom,
|
||||
viaBotId: update.viaBotId,
|
||||
replyTo: update.replyTo,
|
||||
date: update.date,
|
||||
message: update.message,
|
||||
entities: update.entities,
|
||||
ttlPeriod: update.ttlPeriod,
|
||||
}
|
||||
|
||||
// similarly to updateShortMessage, we need to fetch the sender
|
||||
// user and the chat, and also handle "forwarded from" info.
|
||||
const fwdFrom = update.fwdFrom?.fromId
|
||||
? peerToInputPeer(update.fwdFrom.fromId)
|
||||
: undefined
|
||||
|
||||
let rawUsers: tl.TypeUser[]
|
||||
{
|
||||
const id: tl.TypeInputUser[] = [
|
||||
{ _: 'inputUserSelf' },
|
||||
{
|
||||
_: 'inputUser',
|
||||
userId: update.fromId,
|
||||
accessHash: bigInt.zero,
|
||||
},
|
||||
]
|
||||
|
||||
if (fwdFrom) {
|
||||
const inputUser = normalizeToInputUser(fwdFrom)
|
||||
if (inputUser) id.push(inputUser)
|
||||
}
|
||||
|
||||
rawUsers = await this.call({
|
||||
_: 'users.getUsers',
|
||||
id,
|
||||
})
|
||||
}
|
||||
const rawChats = await this.call({
|
||||
_: 'messages.getChats',
|
||||
id: [update.chatId],
|
||||
}).then((res) => res.chats)
|
||||
|
||||
if (fwdFrom) {
|
||||
const inputChannel = normalizeToInputChannel(fwdFrom)
|
||||
if (inputChannel) {
|
||||
const res = await this.call({
|
||||
_: 'channels.getChannels',
|
||||
id: [inputChannel],
|
||||
})
|
||||
rawChats.push(...res.chats)
|
||||
}
|
||||
}
|
||||
|
||||
this._date = update.date
|
||||
const { users, chats } = createUsersChatsIndex({
|
||||
users: rawUsers,
|
||||
chats: rawChats,
|
||||
})
|
||||
this._dispatchUpdate(message, users, chats)
|
||||
}
|
||||
} else if (update._ === 'updateShort') {
|
||||
await this._dispatchUpdate(update.update, {}, {})
|
||||
await this.storage.setCommonPts([null, update.date])
|
||||
} else if (update._ === 'updatesTooLong') {
|
||||
debug('got updatesTooLong')
|
||||
}
|
||||
})().catch((err) => this._emitError(err))
|
||||
})
|
||||
.catch((err) => this._emitError(err))
|
||||
.then(() => this._updLock.release())
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Catch up with the server by loading missed updates.
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
export function catchUp(this: TelegramClient): Promise<void> {
|
||||
return _loadDifference.call(this)
|
||||
}
|
||||
|
||||
|
|
|
@ -150,9 +150,7 @@ export namespace Message {
|
|||
readonly inviter: number
|
||||
}
|
||||
|
||||
export interface MessageForwardInfo<
|
||||
Sender extends User | Chat | string = User | Chat | string
|
||||
> {
|
||||
export interface MessageForwardInfo {
|
||||
/**
|
||||
* Date the original message was sent
|
||||
*/
|
||||
|
@ -162,7 +160,7 @@ export namespace Message {
|
|||
* Sender of the original message (either user or a channel)
|
||||
* or their name (for users with private forwards)
|
||||
*/
|
||||
sender: Sender
|
||||
sender: User | Chat | string
|
||||
|
||||
/**
|
||||
* For messages forwarded from channels,
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import { TelegramClient } from '../../client'
|
||||
import { MaybeArray, MaybeAsync } from '@mtcute/core'
|
||||
import { Message } from '../messages'
|
||||
import { User } from '../peers'
|
||||
import { Chat, User } from '../peers'
|
||||
import {
|
||||
Dice,
|
||||
Photo,
|
||||
|
@ -285,6 +285,14 @@ export namespace filters {
|
|||
export const bot: UpdateFilter<Message, { sender: User }> = (msg) =>
|
||||
msg.sender instanceof User && msg.sender.isBot
|
||||
|
||||
/**
|
||||
* Filter messages sent in broadcast channels
|
||||
*/
|
||||
export const channel: UpdateFilter<
|
||||
Message,
|
||||
{ chat: Modify<Chat, { type: 'channel' }> }
|
||||
> = (msg) => msg.chat.type === 'channel'
|
||||
|
||||
/**
|
||||
* Filter incoming messages.
|
||||
*
|
||||
|
|
24
packages/client/src/utils/lock.ts
Normal file
24
packages/client/src/utils/lock.ts
Normal file
|
@ -0,0 +1,24 @@
|
|||
/** @internal */
|
||||
export class Lock {
|
||||
private _prom: Promise<void> | null = null
|
||||
private _unlock: (() => void) | null = null
|
||||
|
||||
constructor() {
|
||||
this._prom = null
|
||||
this._unlock = null
|
||||
}
|
||||
|
||||
async acquire(): Promise<void> {
|
||||
if (this._prom) await this._prom
|
||||
this._prom = new Promise((resolve) => {
|
||||
this._unlock = resolve
|
||||
})
|
||||
}
|
||||
|
||||
release(): void {
|
||||
if (!this._unlock) return
|
||||
this._unlock()
|
||||
this._prom = null
|
||||
this._unlock = null
|
||||
}
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
import { tl } from '@mtcute/tl'
|
||||
import bigInt from 'big-integer'
|
||||
|
||||
export const INVITE_LINK_REGEX = /^(?:https?:\/\/)?(?:www\.)?(?:t(?:elegram)?\.(?:org|me|dog)\/joinchat\/)([\w-]+)$/i
|
||||
|
||||
|
@ -77,7 +78,19 @@ export function inputPeerToPeer(inp: tl.TypeInputPeer): tl.TypePeer {
|
|||
|
||||
if (inp._ === 'inputPeerChat') return { _: 'peerChat', chatId: inp.chatId }
|
||||
|
||||
return inp as never
|
||||
throw new Error(`Cannot convert ${inp._} to peer`)
|
||||
}
|
||||
|
||||
export function peerToInputPeer(peer: tl.TypePeer, accessHash = bigInt.zero): tl.TypeInputPeer {
|
||||
if (peer._ === 'peerUser')
|
||||
return { _: 'inputPeerUser', userId: peer.userId, accessHash }
|
||||
|
||||
if (peer._ === 'peerChannel')
|
||||
return { _: 'inputPeerChannel', channelId: peer.channelId, accessHash }
|
||||
|
||||
if (peer._ === 'peerChat') return { _: 'inputPeerChat', chatId: peer.chatId }
|
||||
|
||||
return peer as never
|
||||
}
|
||||
|
||||
export function createUsersChatsIndex(
|
||||
|
|
|
@ -207,8 +207,8 @@ export class BaseTelegramClient {
|
|||
private _lastRequestTime = 0
|
||||
private _floodWaitedRequests: Record<string, number> = {}
|
||||
|
||||
private _config?: tl.RawConfig
|
||||
private _cdnConfig?: tl.RawCdnConfig
|
||||
protected _config?: tl.RawConfig
|
||||
protected _cdnConfig?: tl.RawCdnConfig
|
||||
|
||||
private _additionalConnections: TelegramConnection[] = []
|
||||
|
||||
|
@ -639,15 +639,13 @@ export class BaseTelegramClient {
|
|||
|
||||
/**
|
||||
* Adds all peers from a given object to entity cache in storage.
|
||||
* Returns boolean indicating whether there were any `min` entities.
|
||||
*/
|
||||
protected async _cachePeersFrom(obj: any): Promise<boolean> {
|
||||
let isMin = false
|
||||
protected async _cachePeersFrom(obj: any): Promise<void> {
|
||||
const parsedPeers: ITelegramStorage.PeerInfo[] = []
|
||||
|
||||
for (const peer of getAllPeersFrom(obj)) {
|
||||
if ('min' in peer && peer.min) {
|
||||
isMin = true
|
||||
if ((peer as any).min && !peer.fromMessage && !(peer as any).bot) {
|
||||
debug('peer is min, but no context was found: %o %o', obj, peer)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -659,6 +657,7 @@ export class BaseTelegramClient {
|
|||
phone: peer.phone ?? null,
|
||||
type: peer.bot ? 'bot' : 'user',
|
||||
updated: 0,
|
||||
fromMessage: peer.fromMessage
|
||||
})
|
||||
} else if (peer._ === 'chat' || peer._ === 'chatForbidden') {
|
||||
parsedPeers.push({
|
||||
|
@ -668,6 +667,7 @@ export class BaseTelegramClient {
|
|||
phone: null,
|
||||
type: 'group',
|
||||
updated: 0,
|
||||
fromMessage: peer.fromMessage
|
||||
})
|
||||
} else if (peer._ === 'channel' || peer._ === 'channelForbidden') {
|
||||
parsedPeers.push({
|
||||
|
@ -680,12 +680,11 @@ export class BaseTelegramClient {
|
|||
phone: null,
|
||||
type: peer.broadcast ? 'channel' : 'supergroup',
|
||||
updated: 0,
|
||||
fromMessage: peer.fromMessage
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
await this.storage.updatePeers(parsedPeers)
|
||||
|
||||
return isMin
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,37 +11,14 @@ export namespace ITelegramStorage {
|
|||
username: string | null
|
||||
phone: string | null
|
||||
updated: number
|
||||
// marked peer id of chat, message id
|
||||
fromMessage?: [number, number]
|
||||
}
|
||||
|
||||
export interface SelfInfo {
|
||||
isBot: boolean
|
||||
userId: number
|
||||
}
|
||||
|
||||
export function getInputPeer(peerInfo?: PeerInfo): tl.TypeInputPeer | null {
|
||||
if (!peerInfo) return null
|
||||
if (peerInfo.type === 'user' || peerInfo.type === 'bot')
|
||||
return {
|
||||
_: 'inputPeerUser',
|
||||
userId: peerInfo.id,
|
||||
accessHash: peerInfo.accessHash,
|
||||
}
|
||||
|
||||
if (peerInfo.type === 'group')
|
||||
return {
|
||||
_: 'inputPeerChat',
|
||||
chatId: -peerInfo.id,
|
||||
}
|
||||
|
||||
if (peerInfo.type === 'channel' || peerInfo.type === 'supergroup')
|
||||
return {
|
||||
_: 'inputPeerChannel',
|
||||
channelId: MAX_CHANNEL_ID - peerInfo.id,
|
||||
accessHash: peerInfo.accessHash,
|
||||
}
|
||||
|
||||
throw new Error(`Invalid peer type: ${peerInfo.type}`)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -135,15 +112,14 @@ export interface ITelegramStorage {
|
|||
getChannelPts(entityId: number): MaybeAsync<number | null>
|
||||
/**
|
||||
* Set common `pts` and `date` values
|
||||
*
|
||||
* `null` values in the tuple are replaced with the current value,
|
||||
* `null` as a `val` will remove common pts
|
||||
*/
|
||||
setCommonPts(val: [number | null, number | null] | null): MaybeAsync<void>
|
||||
setCommonPts(val: [number, number]): MaybeAsync<void>
|
||||
/**
|
||||
* Set channel `pts` value
|
||||
* Set channels `pts` values in batch.
|
||||
* Storage is supposed to replace stored channel `pts` values
|
||||
* with given in the object (key is unmarked peer id, value is the `pts`)
|
||||
*/
|
||||
setChannelPts(entityId: number, pts: number | null): MaybeAsync<void>
|
||||
setManyChannelPts(values: Record<number, number>): MaybeAsync<void>
|
||||
|
||||
// TODO!
|
||||
// exportToString(): MaybeAsync<string>
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import { ITelegramStorage } from './abstract'
|
||||
import { MaybeAsync } from '../types'
|
||||
import { tl } from '@mtcute/tl'
|
||||
import { MAX_CHANNEL_ID } from '../utils/peer-utils'
|
||||
|
||||
const CURRENT_VERSION = 1
|
||||
|
||||
|
@ -18,7 +19,9 @@ interface MemorySessionState {
|
|||
// username -> peer id
|
||||
usernameIndex: Record<string, number>
|
||||
|
||||
// common pts, date
|
||||
gpts: [number, number] | null
|
||||
// channel pts
|
||||
pts: Record<number, number>
|
||||
|
||||
self: ITelegramStorage.SelfInfo | null
|
||||
|
@ -99,6 +102,9 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
peer.updated = Date.now()
|
||||
const old = this._state.entities[peer.id]
|
||||
if (old) {
|
||||
// min peer
|
||||
if (peer.fromMessage) continue
|
||||
|
||||
// delete old index entries if needed
|
||||
if (old.username && old.username !== peer.username) {
|
||||
delete this._state.usernameIndex[old.username]
|
||||
|
@ -115,16 +121,60 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
}
|
||||
}
|
||||
|
||||
protected _getInputPeer(peerInfo?: ITelegramStorage.PeerInfo): tl.TypeInputPeer | null {
|
||||
if (!peerInfo) return null
|
||||
if (peerInfo.type === 'user' || peerInfo.type === 'bot') {
|
||||
if (peerInfo.fromMessage) {
|
||||
return {
|
||||
_: 'inputPeerUserFromMessage',
|
||||
peer: this.getPeerById(peerInfo.fromMessage[0])!,
|
||||
msgId: peerInfo.fromMessage[1],
|
||||
userId: peerInfo.id
|
||||
}
|
||||
}
|
||||
return {
|
||||
_: 'inputPeerUser',
|
||||
userId: peerInfo.id,
|
||||
accessHash: peerInfo.accessHash,
|
||||
}
|
||||
}
|
||||
|
||||
if (peerInfo.type === 'group')
|
||||
return {
|
||||
_: 'inputPeerChat',
|
||||
chatId: -peerInfo.id,
|
||||
}
|
||||
|
||||
if (peerInfo.type === 'channel' || peerInfo.type === 'supergroup') {
|
||||
if (peerInfo.fromMessage) {
|
||||
return {
|
||||
_: 'inputPeerChannelFromMessage',
|
||||
peer: this.getPeerById(peerInfo.fromMessage[0])!,
|
||||
msgId: peerInfo.fromMessage[1],
|
||||
channelId: peerInfo.id
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
_: 'inputPeerChannel',
|
||||
channelId: MAX_CHANNEL_ID - peerInfo.id,
|
||||
accessHash: peerInfo.accessHash,
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(`Invalid peer type: ${peerInfo.type}`)
|
||||
}
|
||||
|
||||
getPeerById(peerId: number): tl.TypeInputPeer | null {
|
||||
if (peerId in this._cachedInputPeers)
|
||||
return this._cachedInputPeers[peerId]
|
||||
const peer = ITelegramStorage.getInputPeer(this._state.entities[peerId])
|
||||
const peer = this._getInputPeer(this._state.entities[peerId])
|
||||
if (peer) this._cachedInputPeers[peerId] = peer
|
||||
return peer
|
||||
}
|
||||
|
||||
getPeerByPhone(phone: string): tl.TypeInputPeer | null {
|
||||
return ITelegramStorage.getInputPeer(
|
||||
return this._getInputPeer(
|
||||
this._state.entities[this._state.phoneIndex[phone]]
|
||||
)
|
||||
}
|
||||
|
@ -137,7 +187,7 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
|
||||
if (Date.now() - peer.updated > USERNAME_TTL) return null
|
||||
|
||||
return ITelegramStorage.getInputPeer(peer)
|
||||
return this._getInputPeer(peer)
|
||||
}
|
||||
|
||||
getSelf(): ITelegramStorage.SelfInfo | null {
|
||||
|
@ -148,28 +198,18 @@ export class MemoryStorage implements ITelegramStorage {
|
|||
this._state.self = self
|
||||
}
|
||||
|
||||
setChannelPts(entityId: number, pts: number | null): void {
|
||||
if (pts !== null) {
|
||||
this._state.pts[entityId] = pts
|
||||
} else {
|
||||
delete this._state.pts[entityId]
|
||||
}
|
||||
setManyChannelPts(values: Record<number, number>): void {
|
||||
Object.keys(values).forEach((id: any) => {
|
||||
this._state.pts[id] = values[id]
|
||||
})
|
||||
}
|
||||
|
||||
getChannelPts(entityId: number): number | null {
|
||||
return this._state.pts[entityId] ?? null
|
||||
}
|
||||
|
||||
setCommonPts(val: [number | null, number | null] | null): void {
|
||||
if (val) {
|
||||
if (this._state.gpts) {
|
||||
if (val[0] === null) val[0] = this._state.gpts[0]
|
||||
if (val[1] === null) val[1] = this._state.gpts[1]
|
||||
} else {
|
||||
val = null
|
||||
}
|
||||
}
|
||||
this._state.gpts = val as [number, number] | null
|
||||
setCommonPts(val: [number, number]): void {
|
||||
this._state.gpts = val
|
||||
}
|
||||
|
||||
getCommonPts(): [number, number] | null {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import { tl } from '@mtcute/tl'
|
||||
import { BasicPeerType, MaybeArray, PeerType } from '../types'
|
||||
import { BasicPeerType, PeerType } from '../types'
|
||||
|
||||
export const MIN_CHANNEL_ID = -1002147483647
|
||||
export const MAX_CHANNEL_ID = -1000000000000
|
||||
|
@ -92,72 +92,167 @@ export function peerTypeToBasic(type: PeerType): BasicPeerType {
|
|||
throw new Error('Invalid peer type')
|
||||
}
|
||||
|
||||
function comparePeers(
|
||||
first: tl.TypePeer | undefined,
|
||||
second: tl.TypePeer | tl.TypeUser | tl.TypeChat
|
||||
): boolean {
|
||||
if (!first) return false
|
||||
|
||||
if ('userId' in first) {
|
||||
if ('userId' in second) return first.userId === second.userId
|
||||
if (second._ === 'user' || second._ === 'userEmpty')
|
||||
return first.userId === second.id
|
||||
}
|
||||
if ('chatId' in first) {
|
||||
if ('chatId' in second) return first.chatId === second.chatId
|
||||
if (
|
||||
second._ === 'chat' ||
|
||||
second._ === 'chatForbidden' ||
|
||||
second._ === 'chatEmpty'
|
||||
)
|
||||
return first.chatId === second.id
|
||||
}
|
||||
if ('channelId' in first) {
|
||||
if ('channelId' in second) return first.channelId === second.channelId
|
||||
if (second._ === 'channel' || second._ === 'channelForbidden')
|
||||
return first.channelId === second.id
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
function isRefMessage(msg: tl.TypeMessage, peer: any): boolean | undefined {
|
||||
return (
|
||||
comparePeers(msg.peerId, peer) ||
|
||||
('fromId' in msg && comparePeers(msg.fromId, peer)) ||
|
||||
('fwdFrom' in msg && msg.fwdFrom && comparePeers(msg.fwdFrom.fromId, peer))
|
||||
)
|
||||
}
|
||||
|
||||
function findContext(obj: any, peer: any): [number, number] | undefined {
|
||||
if (!peer.min) return undefined
|
||||
if (obj._ === 'updates' || obj._ === 'updatesCombined') {
|
||||
for (const upd of obj.updates as tl.TypeUpdate[]) {
|
||||
if (
|
||||
(upd._ === 'updateNewMessage' ||
|
||||
upd._ === 'updateNewChannelMessage' ||
|
||||
upd._ === 'updateEditMessage' ||
|
||||
upd._ === 'updateEditChannelMessage') &&
|
||||
isRefMessage(upd.message, peer)
|
||||
) {
|
||||
return [getMarkedPeerId(upd.message.peerId!), upd.message.id]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (obj._ === 'updateShortMessage') {
|
||||
return [obj.userId, obj.id]
|
||||
}
|
||||
|
||||
if (obj._ === 'updateShortChatMessage') {
|
||||
return [-obj.chatId, obj.id]
|
||||
}
|
||||
|
||||
if ('messages' in obj || 'newMessages' in obj) {
|
||||
for (const msg of (obj.messages ||
|
||||
obj.newMessages) as tl.TypeMessage[]) {
|
||||
if (isRefMessage(msg, peer)) {
|
||||
return [getMarkedPeerId(msg.peerId!), msg.id]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// im not sure if this is exhaustive check or not
|
||||
|
||||
return undefined
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts all (cacheable) entities from a TlObject or a list of them.
|
||||
* Only checks `.user`, `.chat`, `.channel`, `.users` and `.chats` properties
|
||||
*/
|
||||
export function* getAllPeersFrom(
|
||||
objects: MaybeArray<any>
|
||||
obj: any
|
||||
): Iterable<
|
||||
| tl.RawUser
|
||||
| tl.RawChat
|
||||
| tl.RawChatForbidden
|
||||
| tl.RawChannel
|
||||
| tl.RawChannelForbidden
|
||||
(tl.TypeUser | tl.TypeChat) & { fromMessage: [number, number] | undefined }
|
||||
> {
|
||||
if (!Array.isArray(objects)) objects = [objects]
|
||||
if (typeof obj !== 'object') return
|
||||
|
||||
for (const obj of objects) {
|
||||
if (typeof obj !== 'object') continue
|
||||
if (
|
||||
obj._ === 'user' ||
|
||||
obj._ === 'chat' ||
|
||||
obj._ === 'channel' ||
|
||||
obj._ === 'chatForbidden' ||
|
||||
obj._ === 'channelForbidden'
|
||||
) {
|
||||
yield obj
|
||||
return
|
||||
}
|
||||
if (obj._ === 'userFull') {
|
||||
yield obj.user
|
||||
return
|
||||
}
|
||||
|
||||
if (
|
||||
'user' in obj &&
|
||||
typeof obj.user === 'object' &&
|
||||
obj.user._ === 'user'
|
||||
) {
|
||||
yield obj.user
|
||||
}
|
||||
if (
|
||||
'user' in obj &&
|
||||
typeof obj.user === 'object' &&
|
||||
obj.user._ === 'user'
|
||||
) {
|
||||
yield obj.user
|
||||
}
|
||||
|
||||
if (
|
||||
'chat' in obj &&
|
||||
typeof obj.chat === 'object' &&
|
||||
(obj.chat._ === 'chat' ||
|
||||
obj.chat._ === 'channel' ||
|
||||
obj.chat._ === 'chatForbidden' ||
|
||||
obj.chat._ === 'channelForbidden')
|
||||
) {
|
||||
yield obj.chat
|
||||
}
|
||||
if (
|
||||
'chat' in obj &&
|
||||
typeof obj.chat === 'object' &&
|
||||
(obj.chat._ === 'chat' ||
|
||||
obj.chat._ === 'channel' ||
|
||||
obj.chat._ === 'chatForbidden' ||
|
||||
obj.chat._ === 'channelForbidden')
|
||||
) {
|
||||
yield obj.chat
|
||||
}
|
||||
|
||||
if (
|
||||
'channel' in obj &&
|
||||
typeof obj.channel === 'object' &&
|
||||
(obj.channel._ === 'chat' ||
|
||||
obj.channel._ === 'channel' ||
|
||||
obj.channel._ === 'chatForbidden' ||
|
||||
obj.channel._ === 'channelForbidden')
|
||||
) {
|
||||
yield obj.channel
|
||||
}
|
||||
if (
|
||||
'channel' in obj &&
|
||||
typeof obj.channel === 'object' &&
|
||||
(obj.channel._ === 'chat' ||
|
||||
obj.channel._ === 'channel' ||
|
||||
obj.channel._ === 'chatForbidden' ||
|
||||
obj.channel._ === 'channelForbidden')
|
||||
) {
|
||||
yield obj.channel
|
||||
}
|
||||
|
||||
if ('users' in obj && Array.isArray(obj.users) && obj.users.length) {
|
||||
for (const user of obj.users) {
|
||||
// .users is sometimes number[]
|
||||
if (typeof user === 'object' && user._ === 'user') yield user
|
||||
if ('users' in obj && Array.isArray(obj.users) && obj.users.length) {
|
||||
for (const user of obj.users) {
|
||||
// .users is sometimes number[]
|
||||
if (typeof user === 'object' && user._ === 'user') {
|
||||
if (user.min && !user.bot) {
|
||||
// min seems to be set for @Channel_Bot,
|
||||
// but we don't really need to cache its context
|
||||
// (we don't need to cache it at all, really, but whatever)
|
||||
user.fromMessage = findContext(obj, user)
|
||||
}
|
||||
|
||||
yield user
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ('chats' in obj && Array.isArray(obj.chats) && obj.chats.length) {
|
||||
for (const chat of obj.chats) {
|
||||
// .chats is sometimes number[]
|
||||
if (
|
||||
typeof chat === 'object' &&
|
||||
(chat._ === 'chat' ||
|
||||
chat._ === 'channel' ||
|
||||
chat._ === 'chatForbidden' ||
|
||||
chat._ === 'channelForbidden')
|
||||
)
|
||||
yield chat
|
||||
if ('chats' in obj && Array.isArray(obj.chats) && obj.chats.length) {
|
||||
for (const chat of obj.chats) {
|
||||
// .chats is sometimes number[]
|
||||
if (
|
||||
typeof chat === 'object' &&
|
||||
(chat._ === 'chat' ||
|
||||
chat._ === 'channel' ||
|
||||
chat._ === 'chatForbidden' ||
|
||||
chat._ === 'channelForbidden')
|
||||
) {
|
||||
if (chat.min) {
|
||||
chat.fromMessage = findContext(obj, chat)
|
||||
}
|
||||
|
||||
yield chat
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue