feat: support for dc switches, session import/export, rpc sending

This commit is contained in:
alina 🌸 2023-08-11 23:59:24 +03:00
parent 13c0a97a58
commit 01d476d19a
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
16 changed files with 761 additions and 389 deletions

View file

@ -131,7 +131,6 @@ import { getMessages } from './methods/messages/get-messages'
import { getMessagesUnsafe } from './methods/messages/get-messages-unsafe' import { getMessagesUnsafe } from './methods/messages/get-messages-unsafe'
import { getReactionUsers } from './methods/messages/get-reaction-users' import { getReactionUsers } from './methods/messages/get-reaction-users'
import { getScheduledMessages } from './methods/messages/get-scheduled-messages' import { getScheduledMessages } from './methods/messages/get-scheduled-messages'
import { _normalizeInline } from './methods/messages/normalize-inline'
import { _parseEntities } from './methods/messages/parse-entities' import { _parseEntities } from './methods/messages/parse-entities'
import { pinMessage } from './methods/messages/pin-message' import { pinMessage } from './methods/messages/pin-message'
import { readHistory } from './methods/messages/read-history' import { readHistory } from './methods/messages/read-history'
@ -2759,10 +2758,6 @@ export interface TelegramClient extends BaseTelegramClient {
messageIds: number[] messageIds: number[]
): Promise<(Message | null)[]> ): Promise<(Message | null)[]>
_normalizeInline(
id: string | tl.TypeInputBotInlineMessageID
): Promise<[tl.TypeInputBotInlineMessageID, SessionConnection]>
_parseEntities( _parseEntities(
text?: string | FormattedString<string>, text?: string | FormattedString<string>,
mode?: string | null, mode?: string | null,
@ -4025,7 +4020,6 @@ export class TelegramClient extends BaseTelegramClient {
protected _pendingConversations: Record<number, Conversation[]> protected _pendingConversations: Record<number, Conversation[]>
protected _hasConversations: boolean protected _hasConversations: boolean
protected _downloadConnections: Record<number, SessionConnection> protected _downloadConnections: Record<number, SessionConnection>
protected _connectionsForInline: Record<number, SessionConnection>
protected _parseModes: Record<string, IMessageEntityParser> protected _parseModes: Record<string, IMessageEntityParser>
protected _defaultParseMode: string | null protected _defaultParseMode: string | null
protected _updatesLoopActive: boolean protected _updatesLoopActive: boolean
@ -4061,7 +4055,6 @@ export class TelegramClient extends BaseTelegramClient {
this._pendingConversations = {} this._pendingConversations = {}
this._hasConversations = false this._hasConversations = false
this._downloadConnections = {} this._downloadConnections = {}
this._connectionsForInline = {}
this._parseModes = {} this._parseModes = {}
this._defaultParseMode = null this._defaultParseMode = null
this._updatesLoopActive = false this._updatesLoopActive = false
@ -4213,7 +4206,6 @@ export class TelegramClient extends BaseTelegramClient {
getMessages = getMessages getMessages = getMessages
getReactionUsers = getReactionUsers getReactionUsers = getReactionUsers
getScheduledMessages = getScheduledMessages getScheduledMessages = getScheduledMessages
_normalizeInline = _normalizeInline
_parseEntities = _parseEntities _parseEntities = _parseEntities
pinMessage = pinMessage pinMessage = pinMessage
readHistory = readHistory readHistory = readHistory

View file

@ -1,12 +1,8 @@
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
import { TelegramClient } from '../../client' import { TelegramClient } from '../../client'
import { import { GameHighScore, InputPeerLike, PeersIndex } from '../../types'
GameHighScore, import { normalizeInlineId } from '../../utils/inline-utils'
InputPeerLike,
MtInvalidPeerTypeError,
PeersIndex,
} from '../../types'
import { normalizeToInputUser } from '../../utils/peer-utils' import { normalizeToInputUser } from '../../utils/peer-utils'
/** /**
@ -57,7 +53,7 @@ export async function getInlineGameHighScores(
messageId: string | tl.TypeInputBotInlineMessageID, messageId: string | tl.TypeInputBotInlineMessageID,
userId?: InputPeerLike, userId?: InputPeerLike,
): Promise<GameHighScore[]> { ): Promise<GameHighScore[]> {
const [id, connection] = await this._normalizeInline(messageId) const id = await normalizeInlineId(messageId)
let user: tl.TypeInputUser let user: tl.TypeInputUser
@ -73,7 +69,7 @@ export async function getInlineGameHighScores(
id, id,
userId: user, userId: user,
}, },
{ connection }, { dcId: id.dcId },
) )
const peers = PeersIndex.from(res) const peers = PeersIndex.from(res)

View file

@ -2,6 +2,7 @@ import { tl } from '@mtcute/tl'
import { TelegramClient } from '../../client' import { TelegramClient } from '../../client'
import { InputPeerLike, Message, MtInvalidPeerTypeError } from '../../types' import { InputPeerLike, Message, MtInvalidPeerTypeError } from '../../types'
import { normalizeInlineId } from '../../utils/inline-utils'
import { normalizeToInputUser } from '../../utils/peer-utils' import { normalizeToInputUser } from '../../utils/peer-utils'
/** /**
@ -86,7 +87,7 @@ export async function setInlineGameScore(
const user = normalizeToInputUser(await this.resolvePeer(userId), userId) const user = normalizeToInputUser(await this.resolvePeer(userId), userId)
const [id, connection] = await this._normalizeInline(messageId) const id = await normalizeInlineId(messageId)
await this.call( await this.call(
{ {
@ -97,6 +98,6 @@ export async function setInlineGameScore(
editMessage: !params.noEdit, editMessage: !params.noEdit,
force: params.force, force: params.force,
}, },
{ connection }, { dcId: id.dcId },
) )
} }

View file

@ -7,6 +7,7 @@ import {
InputMediaLike, InputMediaLike,
ReplyMarkup, ReplyMarkup,
} from '../../types' } from '../../types'
import { normalizeInlineId } from '../../utils/inline-utils'
/** /**
* Edit sent inline message text, media and reply markup. * Edit sent inline message text, media and reply markup.
@ -75,7 +76,7 @@ export async function editInlineMessage(
let entities: tl.TypeMessageEntity[] | undefined let entities: tl.TypeMessageEntity[] | undefined
let media: tl.TypeInputMedia | undefined = undefined let media: tl.TypeInputMedia | undefined = undefined
const [id, connection] = await this._normalizeInline(messageId) const id = await normalizeInlineId(messageId)
if (params.media) { if (params.media) {
media = await this._normalizeInputMedia(params.media, params, true) media = await this._normalizeInputMedia(params.media, params, true)
@ -111,7 +112,7 @@ export async function editInlineMessage(
entities, entities,
media, media,
}, },
{ connection }, { dcId: id.dcId },
) )
return return

View file

@ -1,39 +0,0 @@
import { SessionConnection } from '@mtcute/core'
import { tl } from '@mtcute/tl'
import { TelegramClient } from '../../client'
import { parseInlineMessageId } from '../../utils/inline-utils'
// @extension
interface InlineExtension {
_connectionsForInline: Record<number, SessionConnection>
}
// @initialize
function _initializeInline(this: TelegramClient) {
this._connectionsForInline = {}
}
/** @internal */
export async function _normalizeInline(
this: TelegramClient,
id: string | tl.TypeInputBotInlineMessageID,
): Promise<[tl.TypeInputBotInlineMessageID, SessionConnection]> {
if (typeof id === 'string') {
id = parseInlineMessageId(id)
}
// let connection = this.primaryConnection
//
// if (id.dcId !== connection.params.dc.id) {
// if (!(id.dcId in this._connectionsForInline)) {
// this._connectionsForInline[id.dcId] =
// await this.createAdditionalConnection(id.dcId)
// }
// connection = this._connectionsForInline[id.dcId]
// }
//
// return [id, connection]
// fixme
throw new Error('TODO')
}

View file

@ -1,4 +1,4 @@
import { MustEqual } from '@mtcute/core' import { MustEqual, RpcCallOptions } from '@mtcute/core'
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
import { TelegramClient } from '../../client' import { TelegramClient } from '../../client'
@ -31,9 +31,7 @@ export class TakeoutSession {
*/ */
async call<T extends tl.RpcMethod>( async call<T extends tl.RpcMethod>(
message: MustEqual<T, tl.RpcMethod>, message: MustEqual<T, tl.RpcMethod>,
params?: { params?: RpcCallOptions,
throwFlood: boolean
},
): Promise<tl.RpcCallReturn[T['_']]> { ): Promise<tl.RpcCallReturn[T['_']]> {
return this.client.call( return this.client.call(
{ {

View file

@ -66,3 +66,11 @@ export function encodeInlineMessageId(
return encodeUrlSafeBase64(writer.result()) return encodeUrlSafeBase64(writer.result())
} }
export function normalizeInlineId(id: string | tl.TypeInputBotInlineMessageID) {
if (typeof id === 'string') {
return parseInlineMessageId(id)
}
return id
}

View file

@ -13,7 +13,11 @@ import {
TransportFactory, TransportFactory,
} from './network' } from './network'
import { ConfigManager } from './network/config-manager' import { ConfigManager } from './network/config-manager'
import { NetworkManager, NetworkManagerExtraParams } from './network/network-manager' import {
NetworkManager,
NetworkManagerExtraParams,
RpcCallOptions,
} from './network/network-manager'
import { PersistentConnectionParams } from './network/persistent-connection' import { PersistentConnectionParams } from './network/persistent-connection'
import { ITelegramStorage, MemoryStorage } from './storage' import { ITelegramStorage, MemoryStorage } from './storage'
import { MustEqual } from './types' import { MustEqual } from './types'
@ -30,7 +34,6 @@ import {
ICryptoProvider, ICryptoProvider,
LogManager, LogManager,
readStringSession, readStringSession,
sleep,
toggleChannelIdMark, toggleChannelIdMark,
writeStringSession, writeStringSession,
} from './utils' } from './utils'
@ -123,7 +126,7 @@ export interface BaseTelegramClientOptions {
* *
* @default 5 * @default 5
*/ */
rpcRetryCount?: number maxRetryCount?: number
/** /**
* If true, every single API call will be wrapped with `tl.invokeWithoutUpdates`, * If true, every single API call will be wrapped with `tl.invokeWithoutUpdates`,
@ -207,16 +210,6 @@ export class BaseTelegramClient extends EventEmitter {
*/ */
protected readonly _testMode: boolean protected readonly _testMode: boolean
/**
* Flood sleep threshold taken from {@link BaseTelegramClientOptions.floodSleepThreshold}
*/
protected readonly _floodSleepThreshold: number
/**
* RPC retry count taken from {@link BaseTelegramClientOptions.rpcRetryCount}
*/
protected readonly _rpcRetryCount: number
/** /**
* Primary DC taken from {@link BaseTelegramClientOptions.defaultDc}, * Primary DC taken from {@link BaseTelegramClientOptions.defaultDc},
* loaded from session or changed by other means (like redirecting). * loaded from session or changed by other means (like redirecting).
@ -229,7 +222,6 @@ export class BaseTelegramClient extends EventEmitter {
readonly _writerMap: TlWriterMap readonly _writerMap: TlWriterMap
protected _lastUpdateTime = 0 protected _lastUpdateTime = 0
private _floodWaitedRequests: Record<string, number> = {}
protected _config = new ConfigManager(() => protected _config = new ConfigManager(() =>
this.call({ _: 'help.getConfig' }), this.call({ _: 'help.getConfig' }),
@ -287,15 +279,14 @@ export class BaseTelegramClient extends EventEmitter {
} }
this._defaultDc = dc this._defaultDc = dc
this._floodSleepThreshold = opts.floodSleepThreshold ?? 10000
this._rpcRetryCount = opts.rpcRetryCount ?? 5
this._niceStacks = opts.niceStacks ?? true this._niceStacks = opts.niceStacks ?? true
this._layer = opts.overrideLayer ?? tl.LAYER this._layer = opts.overrideLayer ?? tl.LAYER
this._readerMap = opts.readerMap ?? defaultReaderMap this._readerMap = opts.readerMap ?? defaultReaderMap
this._writerMap = opts.writerMap ?? defaultWriterMap this._writerMap = opts.writerMap ?? defaultWriterMap
this.network = new NetworkManager({ this.network = new NetworkManager(
{
apiId, apiId,
crypto: this._crypto, crypto: this._crypto,
disableUpdates: opts.disableUpdates ?? false, disableUpdates: opts.disableUpdates ?? false,
@ -309,8 +300,14 @@ export class BaseTelegramClient extends EventEmitter {
testMode: this._testMode, testMode: this._testMode,
transport: opts.transport, transport: opts.transport,
_emitError: this._emitError.bind(this), _emitError: this._emitError.bind(this),
floodSleepThreshold: opts.floodSleepThreshold ?? 10000,
maxRetryCount: opts.maxRetryCount ?? 5,
isPremium: false, // todo fixme
useIpv6: Boolean(opts.useIpv6),
...(opts.network ?? {}), ...(opts.network ?? {}),
}, this._config) },
this._config,
)
this.storage.setup?.(this.log, this._readerMap, this._writerMap) this.storage.setup?.(this.log, this._readerMap, this._writerMap)
} }
@ -338,25 +335,27 @@ export class BaseTelegramClient extends EventEmitter {
return return
} }
this._connected = createControllablePromise() const promise = (this._connected = createControllablePromise())
await this._loadStorage() await this._loadStorage()
const primaryDc = await this.storage.getDefaultDc() const primaryDc = await this.storage.getDefaultDc()
if (primaryDc !== null) this._defaultDc = primaryDc if (primaryDc !== null) this._defaultDc = primaryDc
const defaultDcAuthKey = await this.storage.getAuthKeyFor(this._defaultDc.id) const defaultDcAuthKey = await this.storage.getAuthKeyFor(
this._defaultDc.id,
)
// await this.primaryConnection.setupKeys() if ((this._importForce || !defaultDcAuthKey) && this._importFrom) {
if (
(this._importForce || !defaultDcAuthKey) &&
this._importFrom
) {
const data = readStringSession(this._readerMap, this._importFrom) const data = readStringSession(this._readerMap, this._importFrom)
if (data.testMode !== !this._testMode) { if (data.testMode !== this._testMode) {
throw new Error( throw new Error(
'This session string is not for the current backend', 'This session string is not for the current backend. ' +
`Session is ${
data.testMode ? 'test' : 'prod'
}, but the client is ${
this._testMode ? 'test' : 'prod'
}`,
) )
} }
@ -373,10 +372,13 @@ export class BaseTelegramClient extends EventEmitter {
await this._saveStorage(true) await this._saveStorage(true)
} }
this.network.connect(this._defaultDc) this.network
.connect(this._defaultDc)
this._connected.resolve() .then(() => {
promise.resolve()
this._connected = true this._connected = true
})
.catch((err) => this._emitError(err))
} }
/** /**
@ -449,122 +451,18 @@ export class BaseTelegramClient extends EventEmitter {
*/ */
async call<T extends tl.RpcMethod>( async call<T extends tl.RpcMethod>(
message: MustEqual<T, tl.RpcMethod>, message: MustEqual<T, tl.RpcMethod>,
params?: { params?: RpcCallOptions,
throwFlood?: boolean
connection?: SessionConnection
timeout?: number
},
): Promise<tl.RpcCallReturn[T['_']]> { ): Promise<tl.RpcCallReturn[T['_']]> {
// todo move to network manager
if (this._connected !== true) { if (this._connected !== true) {
await this.connect() await this.connect()
} }
// do not send requests that are in flood wait
if (message._ in this._floodWaitedRequests) {
const delta = this._floodWaitedRequests[message._] - Date.now()
if (delta <= 3000) {
// flood waits below 3 seconds are "ignored"
delete this._floodWaitedRequests[message._]
} else if (delta <= this._floodSleepThreshold) {
await sleep(delta)
delete this._floodWaitedRequests[message._]
} else {
throw new tl.errors.FloodWaitXError(delta / 1000)
}
}
let lastError: Error | null = null
const stack = this._niceStacks ? new Error().stack : undefined const stack = this._niceStacks ? new Error().stack : undefined
for (let i = 0; i < this._rpcRetryCount; i++) { const res = await this.network.call(message, params, stack)
try {
// fixme temporary hack
// eslint-disable-next-line dot-notation
const res = await this.network['_primaryDc']!.mainConnection.sendRpc(
message,
stack,
params?.timeout,
)
await this._cachePeersFrom(res) await this._cachePeersFrom(res)
return res return res
} catch (e: any) {
lastError = e
if (e instanceof tl.errors.InternalError) {
this.log.warn('Telegram is having internal issues: %s', e)
if (e.message === 'WORKER_BUSY_TOO_LONG_RETRY') {
// according to tdlib, "it is dangerous to resend query without timeout, so use 1"
await sleep(1000)
}
continue
}
if (
e.constructor === tl.errors.FloodWaitXError ||
e.constructor === tl.errors.SlowmodeWaitXError ||
e.constructor === tl.errors.FloodTestPhoneWaitXError
) {
if (e.constructor !== tl.errors.SlowmodeWaitXError) {
// SLOW_MODE_WAIT is chat-specific, not request-specific
this._floodWaitedRequests[message._] =
Date.now() + e.seconds * 1000
}
// In test servers, FLOOD_WAIT_0 has been observed, and sleeping for
// such a short amount will cause retries very fast leading to issues
if (e.seconds === 0) {
(e as any).seconds = 1
}
if (
params?.throwFlood !== true &&
e.seconds <= this._floodSleepThreshold
) {
this.log.info('Flood wait for %d seconds', e.seconds)
await sleep(e.seconds * 1000)
continue
}
}
// if (connection.params.dc.id === this._defaultDc.id) {
// if (
// e.constructor === tl.errors.PhoneMigrateXError ||
// e.constructor === tl.errors.UserMigrateXError ||
// e.constructor === tl.errors.NetworkMigrateXError
// ) {
// this.log.info('Migrate error, new dc = %d', e.new_dc)
// await this.changeDc(e.new_dc)
// continue
// }
// } else {
// if (e.constructor === tl.errors.AuthKeyUnregisteredError) {
// // we can try re-exporting auth from the primary connection
// this.log.warn('exported auth key error, re-exporting..')
//
// const auth = await this.call({
// _: 'auth.exportAuthorization',
// dcId: connection.params.dc.id,
// })
//
// await connection.sendRpc({
// _: 'auth.importAuthorization',
// id: auth.id,
// bytes: auth.bytes,
// })
//
// continue
// }
// }
throw e
}
}
throw lastError
} }
// /** // /**
@ -707,7 +605,9 @@ export class BaseTelegramClient extends EventEmitter {
* the connection in which the error has occurred, in case * the connection in which the error has occurred, in case
* this was connection-related error. * this was connection-related error.
*/ */
onError(handler: typeof this._onError): void { onError(
handler: (err: unknown, connection?: SessionConnection) => void,
): void {
this._onError = handler this._onError = handler
} }
@ -816,16 +716,18 @@ export class BaseTelegramClient extends EventEmitter {
* > with [@BotFather](//t.me/botfather) * > with [@BotFather](//t.me/botfather)
*/ */
async exportSession(): Promise<string> { async exportSession(): Promise<string> {
// todo const primaryDc = await this.storage.getDefaultDc()
// if (!this.primaryConnection.getAuthKey()) if (!primaryDc) throw new Error('No default DC set')
// throw new Error('Auth key is not generated yet')
const authKey = await this.storage.getAuthKeyFor(primaryDc.id)
if (!authKey) throw new Error('Auth key is not ready yet')
return writeStringSession(this._writerMap, { return writeStringSession(this._writerMap, {
version: 1, version: 1,
self: await this.storage.getSelf(), self: await this.storage.getSelf(),
testMode: this._testMode, testMode: this._testMode,
primaryDc: this._defaultDc, primaryDc,
authKey: Buffer.from([]), //this.primaryConnection.getAuthKey()!, authKey,
}) })
} }
@ -837,7 +739,7 @@ export class BaseTelegramClient extends EventEmitter {
* *
* Also note that the session will only be imported in case * Also note that the session will only be imported in case
* the storage is missing authorization (i.e. does not contain * the storage is missing authorization (i.e. does not contain
* auth key for the primary DC), otherwise it will be ignored. * auth key for the primary DC), otherwise it will be ignored (unless `force).
* *
* @param session Session string to import * @param session Session string to import
* @param force Whether to overwrite existing session * @param force Whether to overwrite existing session

View file

@ -70,12 +70,11 @@ export class ConfigManager {
if (this.isStale) await this.update() if (this.isStale) await this.update()
const options = this._config!.dcOptions.filter((opt) => { const options = this._config!.dcOptions.filter((opt) => {
if (opt.id === params.dcId) return true if (opt.tcpoOnly) return false // unsupported
if (opt.ipv6 && !params.allowIpv6) return false if (opt.ipv6 && !params.allowIpv6) return false
if (opt.cdn && !params.cdn) return false if (opt.cdn && !params.cdn) return false
if (opt.tcpoOnly) return false // unsupported
return true return opt.id === params.dcId
}) })
if (params.preferMedia && params.preferIpv6) { if (params.preferMedia && params.preferIpv6) {

View file

@ -1,3 +1,4 @@
export { NetworkManagerExtraParams, RpcCallOptions } from './network-manager'
export * from './reconnection' export * from './reconnection'
export * from './session-connection' export * from './session-connection'
export * from './transports' export * from './transports'

View file

@ -18,9 +18,11 @@ export class MultiSessionConnection extends EventEmitter {
readonly params: SessionConnectionParams, readonly params: SessionConnectionParams,
private _count: number, private _count: number,
log: Logger, log: Logger,
logPrefix = '',
) { ) {
super() super()
this._log = log.create('multi') this._log = log.create('multi')
if (logPrefix) this._log.prefix = `[${logPrefix}] `
this._enforcePfs = _count > 1 && params.isMainConnection this._enforcePfs = _count > 1 && params.isMainConnection
this._sessions = [] this._sessions = []
@ -183,9 +185,13 @@ export class MultiSessionConnection extends EventEmitter {
} }
} }
_destroyed = false
destroy(): void { destroy(): void {
this._connections.forEach((conn) => conn.destroy()) this._connections.forEach((conn) => conn.destroy())
this._sessions.forEach((sess) => sess.reset()) this._sessions.forEach((sess) => sess.reset())
this.removeAllListeners()
this._destroyed = true
} }
private _nextConnection = 0 private _nextConnection = 0
@ -222,18 +228,18 @@ export class MultiSessionConnection extends EventEmitter {
].sendRpc(request, stack, timeout) ].sendRpc(request, stack, timeout)
} }
async changeDc(dc: tl.RawDcOption, authKey?: Buffer | null): Promise<void> {
await Promise.all(
this._connections.map((conn) => conn.changeDc(dc, authKey)),
)
}
connect(): void { connect(): void {
for (const conn of this._connections) { for (const conn of this._connections) {
conn.connect() conn.connect()
} }
} }
ensureConnected(): void {
if (this._connections[0].isConnected) return
this.connect()
}
async setAuthKey( async setAuthKey(
authKey: Buffer | null, authKey: Buffer | null,
temp = false, temp = false,
@ -244,6 +250,41 @@ export class MultiSessionConnection extends EventEmitter {
await key.setup(authKey) await key.setup(authKey)
} }
setInactivityTimeout(timeout?: number): void {
this._log.debug('setting inactivity timeout to %s', timeout)
// for future connections (if any)
this.params.inactivityTimeout = timeout
// for current connections
for (const conn of this._connections) {
conn.setInactivityTimeout(timeout)
}
}
notifyKeyChange(): void {
// only expected to be called on non-main connections
const session = this._sessions[0]
if (this.params.usePfs && !session._authKeyTemp.ready) {
this._log.debug(
'temp auth key needed but not ready, ignoring key change',
)
return
}
if (this._sessions[0].queuedRpc.length) {
// there are pending requests, we need to reconnect.
this._log.debug(
'notifying key change on the connection due to queued rpc',
)
this._connections[0].onConnected()
}
// connection is idle, we don't need to notify it
}
requestAuth(): void { requestAuth(): void {
this._connections[0]._authorize() this._connections[0]._authorize()
} }

View file

@ -2,7 +2,7 @@ import { tl } from '@mtcute/tl'
import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
import { ITelegramStorage } from '../storage' import { ITelegramStorage } from '../storage'
import { ICryptoProvider, Logger } from '../utils' import { ICryptoProvider, Logger, sleep } from '../utils'
import { ConfigManager } from './config-manager' import { ConfigManager } from './config-manager'
import { MultiSessionConnection } from './multi-session-connection' import { MultiSessionConnection } from './multi-session-connection'
import { PersistentConnectionParams } from './persistent-connection' import { PersistentConnectionParams } from './persistent-connection'
@ -16,7 +16,7 @@ import {
} from './session-connection' } from './session-connection'
import { defaultTransportFactory, TransportFactory } from './transports' import { defaultTransportFactory, TransportFactory } from './transports'
export type ConnectionKind = 'main' | 'upload' | 'download' | 'download-small' export type ConnectionKind = 'main' | 'upload' | 'download' | 'downloadSmall'
/** /**
* Params passed into {@link NetworkManager} by {@link TelegramClient}. * Params passed into {@link NetworkManager} by {@link TelegramClient}.
@ -33,15 +33,40 @@ export interface NetworkManagerParams {
> >
transport?: TransportFactory transport?: TransportFactory
reconnectionStrategy?: ReconnectionStrategy<PersistentConnectionParams> reconnectionStrategy?: ReconnectionStrategy<PersistentConnectionParams>
floodSleepThreshold: number
maxRetryCount: number
disableUpdates?: boolean disableUpdates?: boolean
testMode: boolean testMode: boolean
layer: number layer: number
useIpv6: boolean
readerMap: TlReaderMap readerMap: TlReaderMap
writerMap: TlWriterMap writerMap: TlWriterMap
isPremium: boolean
_emitError: (err: Error, connection?: SessionConnection) => void _emitError: (err: Error, connection?: SessionConnection) => void
} }
export type ConnectionCountDelegate = (
kind: ConnectionKind,
dcId: number,
isPremium: boolean
) => number
const defaultConnectionCountDelegate: ConnectionCountDelegate = (
kind,
dcId,
isPremium,
) => {
switch (kind) {
case 'main':
return 1
case 'upload':
return isPremium || (dcId !== 2 && dcId !== 4) ? 8 : 4
case 'download':
case 'downloadSmall':
return isPremium ? 8 : 2
}
}
/** /**
* Additional params passed into {@link NetworkManager} by the user * Additional params passed into {@link NetworkManager} by the user
* that customize the behavior of the manager * that customize the behavior of the manager
@ -55,8 +80,65 @@ export interface NetworkManagerExtraParams {
/** /**
* Connection count for each connection kind * Connection count for each connection kind
*
* Defaults to TDLib logic:
* - main: 1 (should not be changed manually)
* - upload: if premium or dc id is other than 2 or 4, then 8, otherwise 4
* - download: if premium then 8, otherwise 2
* - downloadSmall: if premium then 8, otherwise 2
*/ */
connectionCount?: Partial<Record<ConnectionKind, number>> connectionCount?: ConnectionCountDelegate
/**
* Idle timeout for non-main connections, in ms
* Defaults to 60 seconds.
*/
inactivityTimeout?: number
}
export interface RpcCallOptions {
/**
* If the call results in a `FLOOD_WAIT_X` error,
* the maximum amount of time to wait before retrying.
*
* If set to `0`, the call will not be retried.
*
* @default {@link BaseTelegramClientOptions.floodSleepThreshold}
*/
floodSleepThreshold?: number
/**
* If the call results in an internal server error or a flood wait,
* the maximum amount of times to retry the call.
*
* @default {@link BaseTelegramClientOptions.maxRetryCount}
*/
maxRetryCount?: number
/**
* Timeout for the call, in milliseconds.
*
* @default Infinity
*/
timeout?: number
/**
* Kind of connection to use for this call.
*
* @default 'main'
*/
kind?: ConnectionKind
/**
* ID of the DC to use for this call
*/
dcId?: number
/**
* DC connection manager to use for this call.
* Overrides `dcId` if set.
*/
manager?: DcConnectionManager
} }
export class DcConnectionManager { export class DcConnectionManager {
@ -73,35 +155,87 @@ export class DcConnectionManager {
writerMap: this.manager.params.writerMap, writerMap: this.manager.params.writerMap,
usePfs: this.manager.params.usePfs, usePfs: this.manager.params.usePfs,
isMainConnection: false, isMainConnection: false,
inactivityTimeout: this.manager.params.inactivityTimeout ?? 60_000,
}) })
mainConnection = new MultiSessionConnection( private _log = this.manager._log.create('dc-manager')
{
...this.__baseConnectionParams(), main: MultiSessionConnection
isMainConnection: true,
}, upload = new MultiSessionConnection(
this.manager.params.connectionCount?.main ?? 1, this.__baseConnectionParams(),
this.manager._log, this.manager._connectionCount(
'upload',
this._dc.id,
this.manager.params.isPremium,
),
this._log,
'UPLOAD',
)
download = new MultiSessionConnection(
this.__baseConnectionParams(),
this.manager._connectionCount(
'download',
this._dc.id,
this.manager.params.isPremium,
),
this._log,
'DOWNLOAD',
)
downloadSmall = new MultiSessionConnection(
this.__baseConnectionParams(),
this.manager._connectionCount(
'downloadSmall',
this._dc.id,
this.manager.params.isPremium,
),
this._log,
'DOWNLOAD_SMALL',
) )
constructor( constructor(
readonly manager: NetworkManager, readonly manager: NetworkManager,
readonly dcId: number, readonly dcId: number,
private _dc: tl.RawDcOption, readonly _dc: tl.RawDcOption,
public isPrimary = false,
) { ) {
this._setupMulti(this.mainConnection, 'main') this._log.prefix = `[DC ${dcId}] `
const mainParams = this.__baseConnectionParams()
mainParams.isMainConnection = true
if (isPrimary) {
mainParams.inactivityTimeout = undefined
} }
private _setupMulti( this.main = new MultiSessionConnection(
connection: MultiSessionConnection, mainParams,
kind: ConnectionKind, this.manager._connectionCount(
): void { 'main',
this._dc.id,
this.manager.params.isPremium,
),
this._log,
'MAIN',
)
this._setupMulti('main')
this._setupMulti('upload')
this._setupMulti('download')
this._setupMulti('downloadSmall')
}
private _setupMulti(kind: ConnectionKind): void {
const connection = this[kind]
connection.on('key-change', (idx, key) => { connection.on('key-change', (idx, key) => {
if (kind !== 'main') { if (kind !== 'main') {
// main connection is responsible for authorization, // main connection is responsible for authorization,
// and keys are then sent to other connections // and keys are then sent to other connections
this.manager._log.warn( this.manager._log.warn(
'got key-change from non-main connection', 'got key-change from non-main connection, ignoring',
) )
return return
@ -115,12 +249,20 @@ export class DcConnectionManager {
this.manager._storage.setAuthKeyFor(this.dcId, key) this.manager._storage.setAuthKeyFor(this.dcId, key)
// send key to other connections // send key to other connections
// todo Promise.all([
this.upload.setAuthKey(key),
this.download.setAuthKey(key),
this.downloadSmall.setAuthKey(key),
]).then(() => {
this.upload.notifyKeyChange()
this.download.notifyKeyChange()
this.downloadSmall.notifyKeyChange()
})
}) })
connection.on('tmp-key-change', (idx, key, expires) => { connection.on('tmp-key-change', (idx, key, expires) => {
if (kind !== 'main') { if (kind !== 'main') {
this.manager._log.warn( this.manager._log.warn(
'got tmp-key-change from non-main connection', 'got tmp-key-change from non-main connection, ignoring',
) )
return return
@ -137,6 +279,17 @@ export class DcConnectionManager {
key, key,
expires * 1000, expires * 1000,
) )
// send key to other connections
Promise.all([
this.upload.setAuthKey(key, true),
this.download.setAuthKey(key, true),
this.downloadSmall.setAuthKey(key, true),
]).then(() => {
this.upload.notifyKeyChange()
this.download.notifyKeyChange()
this.downloadSmall.notifyKeyChange()
})
}) })
connection.on('auth-begin', () => { connection.on('auth-begin', () => {
@ -144,7 +297,7 @@ export class DcConnectionManager {
// to avoid them sending requests before auth is complete // to avoid them sending requests before auth is complete
if (kind !== 'main') { if (kind !== 'main') {
this.manager._log.warn( this.manager._log.warn(
'got auth-begin from non-main connection', 'got auth-begin from non-main connection, ignoring',
) )
return return
@ -156,24 +309,58 @@ export class DcConnectionManager {
}) })
connection.on('request-auth', () => { connection.on('request-auth', () => {
this.mainConnection.requestAuth() this.main.requestAuth()
}) })
} }
async loadKeys(): Promise<void> { setIsPrimary(isPrimary: boolean): void {
if (this.isPrimary === isPrimary) return
this.isPrimary = isPrimary
if (isPrimary) {
this.main.setInactivityTimeout(undefined)
} else {
this.main.setInactivityTimeout(
this.manager.params.inactivityTimeout ?? 60_000,
)
}
}
async loadKeys(): Promise<boolean> {
const permanent = await this.manager._storage.getAuthKeyFor(this.dcId) const permanent = await this.manager._storage.getAuthKeyFor(this.dcId)
await this.mainConnection.setAuthKey(permanent) await Promise.all([
this.main.setAuthKey(permanent),
this.upload.setAuthKey(permanent),
this.download.setAuthKey(permanent),
this.downloadSmall.setAuthKey(permanent),
])
if (!permanent) {
return false
}
if (this.manager.params.usePfs) { if (this.manager.params.usePfs) {
for (let i = 0; i < this.mainConnection._sessions.length; i++) { await Promise.all(
this.main._sessions.map(async (_, i) => {
const temp = await this.manager._storage.getAuthKeyFor( const temp = await this.manager._storage.getAuthKeyFor(
this.dcId, this.dcId,
i, i,
) )
await this.mainConnection.setAuthKey(temp, true, i) await this.main.setAuthKey(temp, true, i)
if (i === 0) {
await Promise.all([
this.upload.setAuthKey(temp, true),
this.download.setAuthKey(temp, true),
this.downloadSmall.setAuthKey(temp, true),
])
} }
}),
)
} }
return true
} }
} }
@ -184,6 +371,7 @@ export class NetworkManager {
readonly _initConnectionParams: tl.RawInitConnectionRequest readonly _initConnectionParams: tl.RawInitConnectionRequest
readonly _transportFactory: TransportFactory readonly _transportFactory: TransportFactory
readonly _reconnectionStrategy: ReconnectionStrategy<PersistentConnectionParams> readonly _reconnectionStrategy: ReconnectionStrategy<PersistentConnectionParams>
readonly _connectionCount: ConnectionCountDelegate
protected readonly _dcConnections: Record<number, DcConnectionManager> = {} protected readonly _dcConnections: Record<number, DcConnectionManager> = {}
protected _primaryDc?: DcConnectionManager protected _primaryDc?: DcConnectionManager
@ -252,20 +440,19 @@ export class NetworkManager {
this._transportFactory = params.transport ?? defaultTransportFactory this._transportFactory = params.transport ?? defaultTransportFactory
this._reconnectionStrategy = this._reconnectionStrategy =
params.reconnectionStrategy ?? defaultReconnectionStrategy params.reconnectionStrategy ?? defaultReconnectionStrategy
this._connectionCount =
// this._dcConnections[params.defaultDc?.id ?? 2] = params.connectionCount ?? defaultConnectionCountDelegate
// new DcConnectionManager(this, params.defaultDc?.id ?? 2)
} }
private _switchPrimaryDc(dc: DcConnectionManager) { private _switchPrimaryDc(dc: DcConnectionManager) {
if (this._primaryDc && this._primaryDc !== dc) { if (this._primaryDc && this._primaryDc !== dc) {
// todo clean up this._primaryDc.setIsPrimary(false)
return
} }
this._primaryDc = dc this._primaryDc = dc
dc.setIsPrimary(true)
dc.mainConnection.on('usable', () => { dc.main.on('usable', () => {
this._lastUpdateTime = Date.now() this._lastUpdateTime = Date.now()
if (this._keepAliveInterval) clearInterval(this._keepAliveInterval) if (this._keepAliveInterval) clearInterval(this._keepAliveInterval)
@ -276,19 +463,46 @@ export class NetworkManager {
} }
}, 60_000) }, 60_000)
}) })
dc.mainConnection.on('update', (update) => { dc.main.on('update', (update) => {
this._lastUpdateTime = Date.now() this._lastUpdateTime = Date.now()
this._updateHandler(update) this._updateHandler(update)
}) })
// dc.mainConnection.on('wait', () => // dc.mainConnection.on('wait', () =>
// this._cleanupPrimaryConnection() // this._cleanupPrimaryConnection()
// ) // )
dc.mainConnection.on('error', (err, conn) =>
this.params._emitError(err, conn), dc.main.on('error', (err, conn) => this.params._emitError(err, conn))
)
dc.loadKeys() dc.loadKeys()
.catch((e) => this.params._emitError(e)) .catch((e) => this.params._emitError(e))
.then(() => dc.mainConnection.connect()) .then(() => dc.main.ensureConnected())
}
async _getOtherDc(dcId: number): Promise<DcConnectionManager> {
if (!this._dcConnections[dcId]) {
this._log.debug('creating new DC %d', dcId)
const dcOption = await this.config.findOption({
dcId,
allowIpv6: this.params.useIpv6,
preferIpv6: this.params.useIpv6,
preferMedia: dcId !== this._primaryDc?.dcId,
cdn: false,
})
if (!dcOption) {
throw new Error(`Could not find DC ${dcId}`)
}
const dc = new DcConnectionManager(this, dcId, dcOption)
if (!(await dc.loadKeys())) {
dc.main.requestAuth()
}
this._dcConnections[dcId] = dc
}
return this._dcConnections[dcId]
} }
/** /**
@ -296,23 +510,219 @@ export class NetworkManager {
* *
* @param defaultDc Default DC to connect to * @param defaultDc Default DC to connect to
*/ */
connect(defaultDc: tl.RawDcOption): void { async connect(defaultDc: tl.RawDcOption): Promise<void> {
if (this._dcConnections[defaultDc.id]) { if (this._dcConnections[defaultDc.id]) {
// shouldn't happen // shouldn't happen
throw new Error('DC manager already exists') throw new Error('DC manager already exists')
} }
this._dcConnections[defaultDc.id] = new DcConnectionManager( const dc = new DcConnectionManager(this, defaultDc.id, defaultDc)
this, this._dcConnections[defaultDc.id] = dc
defaultDc.id, await this._switchPrimaryDc(dc)
defaultDc, }
private async _exportAuthTo(manager: DcConnectionManager): Promise<void> {
const auth = await this.call({
_: 'auth.exportAuthorization',
dcId: manager.dcId,
})
// manager.ensureMainConnection()
//
// if (!manager.main._sessions[0]._authKey.ready) {
// await manager.loadKeys()
// }
//
// manager.main.ensureConnected()
const res = await this.call(
{
_: 'auth.importAuthorization',
id: auth.id,
bytes: auth.bytes,
},
{ manager },
) )
this._switchPrimaryDc(this._dcConnections[defaultDc.id])
if (res._ !== 'auth.authorization') {
throw new Error(
`Unexpected response from auth.importAuthorization: ${res._}`,
)
}
}
async exportAuth(): Promise<void> {
const dcs: Record<number, number> = {}
const config = await this.config.get()
for (const dc of config.dcOptions) {
if (dc.cdn) continue
dcs[dc.id] = dc.id
}
for (const dc of Object.values(dcs)) {
if (dc === this._primaryDc!.dcId) continue
this._log.debug('exporting auth for dc %d', dc)
const manager = await this._getOtherDc(dc)
await this._exportAuthTo(manager)
}
}
async changePrimaryDc(newDc: number): Promise<void> {
if (newDc === this._primaryDc?.dcId) return
const option = await this.config.findOption({
dcId: newDc,
allowIpv6: this.params.useIpv6,
preferIpv6: this.params.useIpv6,
cdn: false,
})
if (!option) {
throw new Error(`DC ${newDc} not found`)
}
if (!this._dcConnections[newDc]) {
this._dcConnections[newDc] = new DcConnectionManager(
this,
newDc,
option,
)
}
this._storage.setDefaultDc(option)
this._switchPrimaryDc(this._dcConnections[newDc])
}
private _floodWaitedRequests: Record<string, number> = {}
async call<T extends tl.RpcMethod>(
message: T,
params?: RpcCallOptions,
stack?: string,
): Promise<tl.RpcCallReturn[T['_']]> {
if (!this._primaryDc) {
throw new Error('Not connected to any DC')
}
const floodSleepThreshold =
params?.floodSleepThreshold ?? this.params.floodSleepThreshold
const maxRetryCount = params?.maxRetryCount ?? this.params.maxRetryCount
// do not send requests that are in flood wait
if (message._ in this._floodWaitedRequests) {
const delta = this._floodWaitedRequests[message._] - Date.now()
if (delta <= 3000) {
// flood waits below 3 seconds are "ignored"
delete this._floodWaitedRequests[message._]
} else if (delta <= this.params.floodSleepThreshold) {
await sleep(delta)
delete this._floodWaitedRequests[message._]
} else {
throw new tl.errors.FloodWaitXError(delta / 1000)
}
}
let lastError: Error | null = null
const kind = params?.kind ?? 'main'
let manager: DcConnectionManager
if (params?.manager) {
manager = params.manager
} else if (params?.dcId && params.dcId !== this._primaryDc.dcId) {
manager = await this._getOtherDc(params.dcId)
} else {
manager = this._primaryDc
}
let multi = manager[kind]
for (let i = 0; i < maxRetryCount; i++) {
try {
const res = await multi.sendRpc(message, stack, params?.timeout)
if (kind === 'main') {
this._lastUpdateTime = Date.now()
}
return res
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (e: any) {
lastError = e
if (e instanceof tl.errors.InternalError) {
this._log.warn('Telegram is having internal issues: %s', e)
if (e.message === 'WORKER_BUSY_TOO_LONG_RETRY') {
// according to tdlib, "it is dangerous to resend query without timeout, so use 1"
await sleep(1000)
}
continue
}
if (
e.constructor === tl.errors.FloodWaitXError ||
e.constructor === tl.errors.SlowmodeWaitXError ||
e.constructor === tl.errors.FloodTestPhoneWaitXError
) {
if (e.constructor !== tl.errors.SlowmodeWaitXError) {
// SLOW_MODE_WAIT is chat-specific, not request-specific
this._floodWaitedRequests[message._] =
Date.now() + e.seconds * 1000
}
// In test servers, FLOOD_WAIT_0 has been observed, and sleeping for
// such a short amount will cause retries very fast leading to issues
if (e.seconds === 0) {
(e as tl.Mutable<typeof e>).seconds = 1
}
if (e.seconds <= floodSleepThreshold) {
this._log.info('Flood wait for %d seconds', e.seconds)
await sleep(e.seconds * 1000)
continue
}
}
if (manager === this._primaryDc) {
if (
e.constructor === tl.errors.PhoneMigrateXError ||
e.constructor === tl.errors.UserMigrateXError ||
e.constructor === tl.errors.NetworkMigrateXError
) {
this._log.info('Migrate error, new dc = %d', e.new_dc)
await this.changePrimaryDc(e.new_dc)
manager = this._primaryDc!
multi = manager[kind]
continue
}
} else if (
e.constructor === tl.errors.AuthKeyUnregisteredError
) {
// we can try re-exporting auth from the primary connection
this._log.warn(
'exported auth key error, trying re-exporting..',
)
await this._exportAuthTo(manager)
continue
}
throw e
}
}
throw lastError
} }
destroy(): void { destroy(): void {
for (const dc of Object.values(this._dcConnections)) { for (const dc of Object.values(this._dcConnections)) {
dc.mainConnection.destroy() dc.main.destroy()
} }
if (this._keepAliveInterval) clearInterval(this._keepAliveInterval) if (this._keepAliveInterval) clearInterval(this._keepAliveInterval)
} }

View file

@ -2,10 +2,7 @@ import EventEmitter from 'events'
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
import { import { ICryptoProvider, Logger } from '../utils'
ICryptoProvider,
Logger,
} from '../utils'
import { ReconnectionStrategy } from './reconnection' import { ReconnectionStrategy } from './reconnection'
import { import {
ITelegramTransport, ITelegramTransport,
@ -45,7 +42,7 @@ export abstract class PersistentConnection extends EventEmitter {
// inactivity timeout // inactivity timeout
private _inactivityTimeout: NodeJS.Timeout | null = null private _inactivityTimeout: NodeJS.Timeout | null = null
private _inactive = false private _inactive = true
_destroyed = false _destroyed = false
_usable = false _usable = false
@ -63,19 +60,14 @@ export abstract class PersistentConnection extends EventEmitter {
super() super()
this.params = params this.params = params
this.changeTransport(params.transportFactory) this.changeTransport(params.transportFactory)
this._updateLogPrefix()
this.log.prefix = `[UID ${this._uid}] `
this._onInactivityTimeout = this._onInactivityTimeout.bind(this)
} }
private _updateLogPrefix() { get isConnected(): boolean {
this.log.prefix = `[UID ${this._uid}, DC ${this.params.dc.id}] ` return this._transport.state() !== TransportState.Idle
}
async changeDc(dc: tl.RawDcOption): Promise<void> {
this.log.debug('dc changed to: %j', dc)
this.params.dc = dc
this._updateLogPrefix()
this.reconnect()
} }
changeTransport(factory: TransportFactory): void { changeTransport(factory: TransportFactory): void {
@ -139,7 +131,9 @@ export abstract class PersistentConnection extends EventEmitter {
this._previousWait = wait this._previousWait = wait
if (this._reconnectionTimeout != null) { clearTimeout(this._reconnectionTimeout) } if (this._reconnectionTimeout != null) {
clearTimeout(this._reconnectionTimeout)
}
this._reconnectionTimeout = setTimeout(() => { this._reconnectionTimeout = setTimeout(() => {
if (this._destroyed) return if (this._destroyed) return
this._reconnectionTimeout = null this._reconnectionTimeout = null
@ -148,10 +142,14 @@ export abstract class PersistentConnection extends EventEmitter {
} }
connect(): void { connect(): void {
if (this._transport.state() !== TransportState.Idle) { throw new Error('Connection is already opened!') } if (this.isConnected) {
throw new Error('Connection is already opened!')
}
if (this._destroyed) throw new Error('Connection is already destroyed!') if (this._destroyed) throw new Error('Connection is already destroyed!')
if (this._reconnectionTimeout != null) { clearTimeout(this._reconnectionTimeout) } if (this._reconnectionTimeout != null) {
clearTimeout(this._reconnectionTimeout)
}
this._inactive = false this._inactive = false
this._transport.connect(this.params.dc, this.params.testMode) this._transport.connect(this.params.dc, this.params.testMode)
@ -162,8 +160,12 @@ export abstract class PersistentConnection extends EventEmitter {
} }
destroy(): void { destroy(): void {
if (this._reconnectionTimeout != null) { clearTimeout(this._reconnectionTimeout) } if (this._reconnectionTimeout != null) {
if (this._inactivityTimeout != null) { clearTimeout(this._inactivityTimeout) } clearTimeout(this._reconnectionTimeout)
}
if (this._inactivityTimeout != null) {
clearTimeout(this._inactivityTimeout)
}
this._transport.close() this._transport.close()
this._transport.removeAllListeners() this._transport.removeAllListeners()
@ -173,7 +175,13 @@ export abstract class PersistentConnection extends EventEmitter {
protected _rescheduleInactivity(): void { protected _rescheduleInactivity(): void {
if (!this.params.inactivityTimeout) return if (!this.params.inactivityTimeout) return
if (this._inactivityTimeout) clearTimeout(this._inactivityTimeout) if (this._inactivityTimeout) clearTimeout(this._inactivityTimeout)
this._inactivityTimeout = setTimeout(() => { this._inactivityTimeout = setTimeout(
this._onInactivityTimeout,
this.params.inactivityTimeout,
)
}
protected _onInactivityTimeout(): void {
this.log.info( this.log.info(
'disconnected because of inactivity for %d', 'disconnected because of inactivity for %d',
this.params.inactivityTimeout, this.params.inactivityTimeout,
@ -181,7 +189,18 @@ export abstract class PersistentConnection extends EventEmitter {
this._inactive = true this._inactive = true
this._inactivityTimeout = null this._inactivityTimeout = null
this._transport.close() this._transport.close()
}, this.params.inactivityTimeout) }
setInactivityTimeout(timeout?: number): void {
this.params.inactivityTimeout = timeout
if (this._inactivityTimeout) {
clearTimeout(this._inactivityTimeout)
}
if (timeout) {
this._rescheduleInactivity()
}
} }
async send(data: Buffer): Promise<void> { async send(data: Buffer): Promise<void> {

View file

@ -102,12 +102,6 @@ export class SessionConnection extends PersistentConnection {
this._handleRawMessage = this._handleRawMessage.bind(this) this._handleRawMessage = this._handleRawMessage.bind(this)
} }
async changeDc(dc: tl.RawDcOption, authKey?: Buffer | null): Promise<void> {
this._session.reset()
await this._session._authKey.setup(authKey)
await super.changeDc(dc)
}
getAuthKey(temp = false): Buffer | null { getAuthKey(temp = false): Buffer | null {
const key = temp ? this._session._authKeyTemp : this._session._authKey const key = temp ? this._session._authKeyTemp : this._session._authKey
@ -135,10 +129,12 @@ export class SessionConnection extends PersistentConnection {
onTransportClose(): void { onTransportClose(): void {
super.onTransportClose() super.onTransportClose()
Object.values(this._pendingWaitForUnencrypted).forEach(([prom, timeout]) => { Object.values(this._pendingWaitForUnencrypted).forEach(
([prom, timeout]) => {
prom.reject(new Error('Connection closed')) prom.reject(new Error('Connection closed'))
clearTimeout(timeout) clearTimeout(timeout)
}) },
)
this.emit('disconnect') this.emit('disconnect')
@ -174,7 +170,9 @@ export class SessionConnection extends PersistentConnection {
this.log.info('no perm auth key, authorizing...') this.log.info('no perm auth key, authorizing...')
this._authorize() this._authorize()
// todo: if we use pfs, we can also start temp key exchange here // if we use pfs, we *could* also start temp key exchange here
// but telegram restricts us to only have one auth session per connection,
// and having a separate connection for pfs is not worth it
return return
} }
@ -217,7 +215,9 @@ export class SessionConnection extends PersistentConnection {
return return
} else if (this._isPfsBindingPending) { } else if (this._isPfsBindingPending) {
this.log.info('transport error 404, pfs binding in progress') this.log.info(
'transport error 404, pfs binding in progress',
)
this._onAllFailed('temp key expired, binding pending') this._onAllFailed('temp key expired, binding pending')
@ -338,7 +338,9 @@ export class SessionConnection extends PersistentConnection {
doAuthorization(this, this.params.crypto, TEMP_AUTH_KEY_EXPIRY) doAuthorization(this, this.params.crypto, TEMP_AUTH_KEY_EXPIRY)
.then(async ([tempAuthKey, tempServerSalt]) => { .then(async ([tempAuthKey, tempServerSalt]) => {
if (!this._usePfs) { if (!this._usePfs) {
this.log.info('pfs has been disabled while generating temp key') this.log.info(
'pfs has been disabled while generating temp key',
)
return return
} }
@ -397,7 +399,9 @@ export class SessionConnection extends PersistentConnection {
encryptedData, encryptedData,
]) ])
const promise = createControllablePromise<mtp.RawMt_rpc_error | boolean>() const promise = createControllablePromise<
mtp.RawMt_rpc_error | boolean
>()
// encrypt the message using temp key and same msg id // encrypt the message using temp key and same msg id
// this is a bit of a hack, but it works // this is a bit of a hack, but it works
@ -449,7 +453,9 @@ export class SessionConnection extends PersistentConnection {
this._session.pendingMessages.delete(msgId) this._session.pendingMessages.delete(msgId)
if (!this._usePfs) { if (!this._usePfs) {
this.log.info('pfs has been disabled while binding temp key') this.log.info(
'pfs has been disabled while binding temp key',
)
return return
} }
@ -526,7 +532,8 @@ export class SessionConnection extends PersistentConnection {
// auth_key_id = 0, meaning it's an unencrypted message used for authorization // auth_key_id = 0, meaning it's an unencrypted message used for authorization
if (this._pendingWaitForUnencrypted.length) { if (this._pendingWaitForUnencrypted.length) {
const [promise, timeout] = this._pendingWaitForUnencrypted.shift()! const [promise, timeout] =
this._pendingWaitForUnencrypted.shift()!
clearTimeout(timeout) clearTimeout(timeout)
promise.resolve(data) promise.resolve(data)
} else { } else {
@ -583,25 +590,19 @@ export class SessionConnection extends PersistentConnection {
for (let i = 0; i < count; i++) { for (let i = 0; i < count; i++) {
// msg_id:long seqno:int bytes:int // msg_id:long seqno:int bytes:int
const msgId = message.long() const msgId = message.long()
message.uint() // seqno const seqNo = message.uint() // seqno
const length = message.uint() const length = message.uint()
// container can't contain other containers, so we are safe // container can't contain other containers, but can contain rpc_result
const start = message.pos const obj = message.raw(length)
const obj = message.object()
// ensure length this._handleRawMessage(
if (message.pos - start !== length) { msgId,
this.log.warn( seqNo,
'received message with invalid length in container (%d != %d)', new TlBinaryReader(this._readerMap, obj),
message.pos - start,
length,
) )
} }
this._handleMessage(msgId, obj)
}
return return
} }
@ -609,6 +610,8 @@ export class SessionConnection extends PersistentConnection {
// rpc_result // rpc_result
message.uint() message.uint()
this._sendAck(messageId)
return this._onRpcResult(message) return this._onRpcResult(message)
} }
@ -743,7 +746,7 @@ export class SessionConnection extends PersistentConnection {
resultType = message.peekUint() resultType = message.peekUint()
} }
this.log.warn( this.log.warn(
'received rpc_result with %s with req_msg_id = 0', 'received rpc_result with %j with req_msg_id = 0',
resultType, resultType,
) )
@ -760,15 +763,11 @@ export class SessionConnection extends PersistentConnection {
} catch (err) { } catch (err) {
result = '[failed to parse]' result = '[failed to parse]'
} }
this.log.warn(
'received rpc_result with %s with req_msg_id = 0',
result,
)
// check if the msg is one of the recent ones // check if the msg is one of the recent ones
if (this._session.recentOutgoingMsgIds.has(reqMsgId)) { if (this._session.recentOutgoingMsgIds.has(reqMsgId)) {
this.log.debug( this.log.debug(
'received rpc_result again for %l (contains %s)', 'received rpc_result again for %l (contains %j)',
reqMsgId, reqMsgId,
result, result,
) )
@ -786,7 +785,7 @@ export class SessionConnection extends PersistentConnection {
// special case for auth key binding // special case for auth key binding
if (msg._ !== 'rpc') { if (msg._ !== 'rpc') {
if (msg._ === 'bind') { if (msg._ === 'bind') {
msg.promise.resolve(result) msg.promise.resolve(message.object())
return return
} }
@ -1387,7 +1386,9 @@ export class SessionConnection extends PersistentConnection {
} }
private _enqueueRpc(rpc: PendingRpc, force?: boolean) { private _enqueueRpc(rpc: PendingRpc, force?: boolean) {
if (this._session.enqueueRpc(rpc, force)) { this._flushTimer.emitWhenIdle() } if (this._session.enqueueRpc(rpc, force)) {
this._flushTimer.emitWhenIdle()
}
} }
_resetSession(): void { _resetSession(): void {
@ -1575,18 +1576,57 @@ export class SessionConnection extends PersistentConnection {
} }
} }
private _flush(): void { private get _hasPendingServiceMessages(): boolean {
return Boolean(
this._session.queuedRpc.length ||
this._session.queuedAcks.length ||
this._session.queuedStateReq.length ||
this._session.queuedResendReq.length,
)
}
protected _onInactivityTimeout() {
// we should send all pending acks and other service messages
// before dropping the connection
if (!this._hasPendingServiceMessages) {
this.log.debug('no pending service messages, closing connection')
super._onInactivityTimeout()
return
}
this._flush(() => {
if (this._hasPendingServiceMessages) {
// the callback will be called again once all pending messages are sent
return
}
this.log.debug('pending service messages sent, closing connection')
this._flushTimer.reset()
super._onInactivityTimeout()
})
}
private _flush(callback?: () => void): void {
if ( if (
!this._session._authKey.ready || !this._session._authKey.ready ||
this._isPfsBindingPending || this._isPfsBindingPending ||
this._current429Timeout this._current429Timeout
) { ) {
this.log.debug(
'skipping flush, connection is not usable (auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)',
this._session._authKey.ready,
this._isPfsBindingPending,
Boolean(this._current429Timeout),
)
// it will be flushed once connection is usable // it will be flushed once connection is usable
return return
} }
try { try {
this._doFlush() this._doFlush(callback)
} catch (e: any) { } catch (e: any) {
this.log.error('flush error: %s', e.stack) this.log.error('flush error: %s', e.stack)
// should not happen unless there's a bug in the code // should not happen unless there's a bug in the code
@ -1601,13 +1641,13 @@ export class SessionConnection extends PersistentConnection {
this._session.queuedStateReq.length || this._session.queuedStateReq.length ||
this._session.queuedResendReq.length this._session.queuedResendReq.length
) { ) {
this._flush() this._flush(callback)
} else { } else {
this._flushTimer.emitBefore(this._lastPingTime + 60000) this._flushTimer.emitBefore(this._lastPingTime + 60000)
} }
} }
private _doFlush(): void { private _doFlush(callback?: () => void): void {
this.log.debug( this.log.debug(
'flushing send queue. queued rpc: %d', 'flushing send queue. queued rpc: %d',
this._session.queuedRpc.length, this._session.queuedRpc.length,
@ -2015,6 +2055,7 @@ export class SessionConnection extends PersistentConnection {
this._session this._session
.encryptMessage(result) .encryptMessage(result)
.then((enc) => this.send(enc)) .then((enc) => this.send(enc))
.then(callback)
.catch((err) => { .catch((err) => {
this.log.error( this.log.error(
'error while sending pending messages (root msg_id = %l): %s', 'error while sending pending messages (root msg_id = %l): %s',
@ -2023,7 +2064,9 @@ export class SessionConnection extends PersistentConnection {
) )
// put acks in the front so they are the first to be sent // put acks in the front so they are the first to be sent
if (ackMsgIds) { this._session.queuedAcks.splice(0, 0, ...ackMsgIds) } if (ackMsgIds) {
this._session.queuedAcks.splice(0, 0, ...ackMsgIds)
}
this._onMessageFailed(rootMsgId, 'unknown error') this._onMessageFailed(rootMsgId, 'unknown error')
}) })
} }

View file

@ -2,7 +2,7 @@ import { isatty } from 'tty'
const isTty = isatty(process.stdout.fd) const isTty = isatty(process.stdout.fd)
const BASE_FORMAT = isTty ? '[%s] [%s] %s%s\x1b[0m - ' : '[%s] [%s] %s - ' const BASE_FORMAT = isTty ? '%s [%s] [%s%s\x1b[0m] ' : '%s [%s] [%s] '
const LEVEL_NAMES = isTty ? const LEVEL_NAMES = isTty ?
[ [
'', // OFF '', // OFF

View file

@ -1,4 +1,4 @@
const BASE_FORMAT = '[%s] [%с%s%с] %c%s%c - ' const BASE_FORMAT = '%s [%с%s%с] [%c%s%c] '
const LEVEL_NAMES = [ const LEVEL_NAMES = [
'', // OFF '', // OFF
'ERR', 'ERR',