diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index b214cc3e..6f8fd260 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -131,7 +131,6 @@ import { getMessages } from './methods/messages/get-messages' import { getMessagesUnsafe } from './methods/messages/get-messages-unsafe' import { getReactionUsers } from './methods/messages/get-reaction-users' import { getScheduledMessages } from './methods/messages/get-scheduled-messages' -import { _normalizeInline } from './methods/messages/normalize-inline' import { _parseEntities } from './methods/messages/parse-entities' import { pinMessage } from './methods/messages/pin-message' import { readHistory } from './methods/messages/read-history' @@ -2759,10 +2758,6 @@ export interface TelegramClient extends BaseTelegramClient { messageIds: number[] ): Promise<(Message | null)[]> - _normalizeInline( - id: string | tl.TypeInputBotInlineMessageID - ): Promise<[tl.TypeInputBotInlineMessageID, SessionConnection]> - _parseEntities( text?: string | FormattedString, mode?: string | null, @@ -4025,7 +4020,6 @@ export class TelegramClient extends BaseTelegramClient { protected _pendingConversations: Record protected _hasConversations: boolean protected _downloadConnections: Record - protected _connectionsForInline: Record protected _parseModes: Record protected _defaultParseMode: string | null protected _updatesLoopActive: boolean @@ -4061,7 +4055,6 @@ export class TelegramClient extends BaseTelegramClient { this._pendingConversations = {} this._hasConversations = false this._downloadConnections = {} - this._connectionsForInline = {} this._parseModes = {} this._defaultParseMode = null this._updatesLoopActive = false @@ -4213,7 +4206,6 @@ export class TelegramClient extends BaseTelegramClient { getMessages = getMessages getReactionUsers = getReactionUsers getScheduledMessages = getScheduledMessages - _normalizeInline = _normalizeInline _parseEntities = _parseEntities pinMessage = pinMessage readHistory = readHistory diff --git a/packages/client/src/methods/bots/get-game-high-scores.ts b/packages/client/src/methods/bots/get-game-high-scores.ts index 5759c5d4..803f2feb 100644 --- a/packages/client/src/methods/bots/get-game-high-scores.ts +++ b/packages/client/src/methods/bots/get-game-high-scores.ts @@ -1,12 +1,8 @@ import { tl } from '@mtcute/tl' import { TelegramClient } from '../../client' -import { - GameHighScore, - InputPeerLike, - MtInvalidPeerTypeError, - PeersIndex, -} from '../../types' +import { GameHighScore, InputPeerLike, PeersIndex } from '../../types' +import { normalizeInlineId } from '../../utils/inline-utils' import { normalizeToInputUser } from '../../utils/peer-utils' /** @@ -57,7 +53,7 @@ export async function getInlineGameHighScores( messageId: string | tl.TypeInputBotInlineMessageID, userId?: InputPeerLike, ): Promise { - const [id, connection] = await this._normalizeInline(messageId) + const id = await normalizeInlineId(messageId) let user: tl.TypeInputUser @@ -73,7 +69,7 @@ export async function getInlineGameHighScores( id, userId: user, }, - { connection }, + { dcId: id.dcId }, ) const peers = PeersIndex.from(res) diff --git a/packages/client/src/methods/bots/set-game-score.ts b/packages/client/src/methods/bots/set-game-score.ts index e95351c0..091ab765 100644 --- a/packages/client/src/methods/bots/set-game-score.ts +++ b/packages/client/src/methods/bots/set-game-score.ts @@ -2,6 +2,7 @@ import { tl } from '@mtcute/tl' import { TelegramClient } from '../../client' import { InputPeerLike, Message, MtInvalidPeerTypeError } from '../../types' +import { normalizeInlineId } from '../../utils/inline-utils' import { normalizeToInputUser } from '../../utils/peer-utils' /** @@ -86,7 +87,7 @@ export async function setInlineGameScore( const user = normalizeToInputUser(await this.resolvePeer(userId), userId) - const [id, connection] = await this._normalizeInline(messageId) + const id = await normalizeInlineId(messageId) await this.call( { @@ -97,6 +98,6 @@ export async function setInlineGameScore( editMessage: !params.noEdit, force: params.force, }, - { connection }, + { dcId: id.dcId }, ) } diff --git a/packages/client/src/methods/messages/edit-inline-message.ts b/packages/client/src/methods/messages/edit-inline-message.ts index b2691da4..94363b14 100644 --- a/packages/client/src/methods/messages/edit-inline-message.ts +++ b/packages/client/src/methods/messages/edit-inline-message.ts @@ -7,6 +7,7 @@ import { InputMediaLike, ReplyMarkup, } from '../../types' +import { normalizeInlineId } from '../../utils/inline-utils' /** * Edit sent inline message text, media and reply markup. @@ -75,7 +76,7 @@ export async function editInlineMessage( let entities: tl.TypeMessageEntity[] | undefined let media: tl.TypeInputMedia | undefined = undefined - const [id, connection] = await this._normalizeInline(messageId) + const id = await normalizeInlineId(messageId) if (params.media) { media = await this._normalizeInputMedia(params.media, params, true) @@ -111,7 +112,7 @@ export async function editInlineMessage( entities, media, }, - { connection }, + { dcId: id.dcId }, ) return diff --git a/packages/client/src/methods/messages/normalize-inline.ts b/packages/client/src/methods/messages/normalize-inline.ts deleted file mode 100644 index dc611f66..00000000 --- a/packages/client/src/methods/messages/normalize-inline.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { SessionConnection } from '@mtcute/core' -import { tl } from '@mtcute/tl' - -import { TelegramClient } from '../../client' -import { parseInlineMessageId } from '../../utils/inline-utils' - -// @extension -interface InlineExtension { - _connectionsForInline: Record -} - -// @initialize -function _initializeInline(this: TelegramClient) { - this._connectionsForInline = {} -} - -/** @internal */ -export async function _normalizeInline( - this: TelegramClient, - id: string | tl.TypeInputBotInlineMessageID, -): Promise<[tl.TypeInputBotInlineMessageID, SessionConnection]> { - if (typeof id === 'string') { - id = parseInlineMessageId(id) - } - - // let connection = this.primaryConnection - // - // if (id.dcId !== connection.params.dc.id) { - // if (!(id.dcId in this._connectionsForInline)) { - // this._connectionsForInline[id.dcId] = - // await this.createAdditionalConnection(id.dcId) - // } - // connection = this._connectionsForInline[id.dcId] - // } - // - // return [id, connection] - // fixme - throw new Error('TODO') -} diff --git a/packages/client/src/types/misc/takeout-session.ts b/packages/client/src/types/misc/takeout-session.ts index da73a358..af8f8af6 100644 --- a/packages/client/src/types/misc/takeout-session.ts +++ b/packages/client/src/types/misc/takeout-session.ts @@ -1,4 +1,4 @@ -import { MustEqual } from '@mtcute/core' +import { MustEqual, RpcCallOptions } from '@mtcute/core' import { tl } from '@mtcute/tl' import { TelegramClient } from '../../client' @@ -31,9 +31,7 @@ export class TakeoutSession { */ async call( message: MustEqual, - params?: { - throwFlood: boolean - }, + params?: RpcCallOptions, ): Promise { return this.client.call( { diff --git a/packages/client/src/utils/inline-utils.ts b/packages/client/src/utils/inline-utils.ts index cd3d32c2..297d2f9b 100644 --- a/packages/client/src/utils/inline-utils.ts +++ b/packages/client/src/utils/inline-utils.ts @@ -66,3 +66,11 @@ export function encodeInlineMessageId( return encodeUrlSafeBase64(writer.result()) } + +export function normalizeInlineId(id: string | tl.TypeInputBotInlineMessageID) { + if (typeof id === 'string') { + return parseInlineMessageId(id) + } + + return id +} diff --git a/packages/core/src/base-client.ts b/packages/core/src/base-client.ts index 949857ef..ab65f13f 100644 --- a/packages/core/src/base-client.ts +++ b/packages/core/src/base-client.ts @@ -13,7 +13,11 @@ import { TransportFactory, } from './network' import { ConfigManager } from './network/config-manager' -import { NetworkManager, NetworkManagerExtraParams } from './network/network-manager' +import { + NetworkManager, + NetworkManagerExtraParams, + RpcCallOptions, +} from './network/network-manager' import { PersistentConnectionParams } from './network/persistent-connection' import { ITelegramStorage, MemoryStorage } from './storage' import { MustEqual } from './types' @@ -30,7 +34,6 @@ import { ICryptoProvider, LogManager, readStringSession, - sleep, toggleChannelIdMark, writeStringSession, } from './utils' @@ -64,27 +67,27 @@ export interface BaseTelegramClientOptions { */ useIpv6?: boolean - /** - * Primary DC to use for initial connection. - * This does not mean this will be the only DC used, - * nor that this DC will actually be primary, this only - * determines the first DC the library will try to connect to. - * Can be used to connect to other networks (like test DCs). - * - * When session already contains primary DC, this parameter is ignored. - * Defaults to Production DC 2. - */ - defaultDc?: tl.RawDcOption + /** + * Primary DC to use for initial connection. + * This does not mean this will be the only DC used, + * nor that this DC will actually be primary, this only + * determines the first DC the library will try to connect to. + * Can be used to connect to other networks (like test DCs). + * + * When session already contains primary DC, this parameter is ignored. + * Defaults to Production DC 2. + */ + defaultDc?: tl.RawDcOption - /** - * Whether to connect to test servers. - * - * If passed, {@link defaultDc} defaults to Test DC 2. - * - * **Must** be passed if using test servers, even if - * you passed custom {@link defaultDc} - */ - testMode?: boolean + /** + * Whether to connect to test servers. + * + * If passed, {@link defaultDc} defaults to Test DC 2. + * + * **Must** be passed if using test servers, even if + * you passed custom {@link defaultDc} + */ + testMode?: boolean /** * Additional options for initConnection call. @@ -123,7 +126,7 @@ export interface BaseTelegramClientOptions { * * @default 5 */ - rpcRetryCount?: number + maxRetryCount?: number /** * If true, every single API call will be wrapped with `tl.invokeWithoutUpdates`, @@ -152,19 +155,19 @@ export interface BaseTelegramClientOptions { */ niceStacks?: boolean - /** - * Extra parameters for {@link NetworkManager} - */ - network?: NetworkManagerExtraParams + /** + * Extra parameters for {@link NetworkManager} + */ + network?: NetworkManagerExtraParams - /** - * **EXPERT USE ONLY!** - * - * Override TL layer used for the connection. - * - * **Does not** change the schema used. - */ - overrideLayer?: number + /** + * **EXPERT USE ONLY!** + * + * Override TL layer used for the connection. + * + * **Does not** change the schema used. + */ + overrideLayer?: number /** * **EXPERT USE ONLY** @@ -207,16 +210,6 @@ export class BaseTelegramClient extends EventEmitter { */ protected readonly _testMode: boolean - /** - * Flood sleep threshold taken from {@link BaseTelegramClientOptions.floodSleepThreshold} - */ - protected readonly _floodSleepThreshold: number - - /** - * RPC retry count taken from {@link BaseTelegramClientOptions.rpcRetryCount} - */ - protected readonly _rpcRetryCount: number - /** * Primary DC taken from {@link BaseTelegramClientOptions.defaultDc}, * loaded from session or changed by other means (like redirecting). @@ -229,7 +222,6 @@ export class BaseTelegramClient extends EventEmitter { readonly _writerMap: TlWriterMap protected _lastUpdateTime = 0 - private _floodWaitedRequests: Record = {} protected _config = new ConfigManager(() => this.call({ _: 'help.getConfig' }), @@ -287,30 +279,35 @@ export class BaseTelegramClient extends EventEmitter { } this._defaultDc = dc - this._floodSleepThreshold = opts.floodSleepThreshold ?? 10000 - this._rpcRetryCount = opts.rpcRetryCount ?? 5 this._niceStacks = opts.niceStacks ?? true this._layer = opts.overrideLayer ?? tl.LAYER this._readerMap = opts.readerMap ?? defaultReaderMap this._writerMap = opts.writerMap ?? defaultWriterMap - this.network = new NetworkManager({ - apiId, - crypto: this._crypto, - disableUpdates: opts.disableUpdates ?? false, - initConnectionOptions: opts.initConnectionOptions, - layer: this._layer, - log: this.log, - readerMap: this._readerMap, - writerMap: this._writerMap, - reconnectionStrategy: opts.reconnectionStrategy, - storage: this.storage, - testMode: this._testMode, - transport: opts.transport, - _emitError: this._emitError.bind(this), - ...(opts.network ?? {}), - }, this._config) + this.network = new NetworkManager( + { + apiId, + crypto: this._crypto, + disableUpdates: opts.disableUpdates ?? false, + initConnectionOptions: opts.initConnectionOptions, + layer: this._layer, + log: this.log, + readerMap: this._readerMap, + writerMap: this._writerMap, + reconnectionStrategy: opts.reconnectionStrategy, + storage: this.storage, + testMode: this._testMode, + transport: opts.transport, + _emitError: this._emitError.bind(this), + floodSleepThreshold: opts.floodSleepThreshold ?? 10000, + maxRetryCount: opts.maxRetryCount ?? 5, + isPremium: false, // todo fixme + useIpv6: Boolean(opts.useIpv6), + ...(opts.network ?? {}), + }, + this._config, + ) this.storage.setup?.(this.log, this._readerMap, this._writerMap) } @@ -338,25 +335,27 @@ export class BaseTelegramClient extends EventEmitter { return } - this._connected = createControllablePromise() + const promise = (this._connected = createControllablePromise()) await this._loadStorage() const primaryDc = await this.storage.getDefaultDc() if (primaryDc !== null) this._defaultDc = primaryDc - const defaultDcAuthKey = await this.storage.getAuthKeyFor(this._defaultDc.id) + const defaultDcAuthKey = await this.storage.getAuthKeyFor( + this._defaultDc.id, + ) - // await this.primaryConnection.setupKeys() - - if ( - (this._importForce || !defaultDcAuthKey) && - this._importFrom - ) { + if ((this._importForce || !defaultDcAuthKey) && this._importFrom) { const data = readStringSession(this._readerMap, this._importFrom) - if (data.testMode !== !this._testMode) { + if (data.testMode !== this._testMode) { throw new Error( - 'This session string is not for the current backend', + 'This session string is not for the current backend. ' + + `Session is ${ + data.testMode ? 'test' : 'prod' + }, but the client is ${ + this._testMode ? 'test' : 'prod' + }`, ) } @@ -373,10 +372,13 @@ export class BaseTelegramClient extends EventEmitter { await this._saveStorage(true) } - this.network.connect(this._defaultDc) - - this._connected.resolve() - this._connected = true + this.network + .connect(this._defaultDc) + .then(() => { + promise.resolve() + this._connected = true + }) + .catch((err) => this._emitError(err)) } /** @@ -449,122 +451,18 @@ export class BaseTelegramClient extends EventEmitter { */ async call( message: MustEqual, - params?: { - throwFlood?: boolean - connection?: SessionConnection - timeout?: number - }, + params?: RpcCallOptions, ): Promise { - // todo move to network manager if (this._connected !== true) { await this.connect() } - // do not send requests that are in flood wait - if (message._ in this._floodWaitedRequests) { - const delta = this._floodWaitedRequests[message._] - Date.now() - - if (delta <= 3000) { - // flood waits below 3 seconds are "ignored" - delete this._floodWaitedRequests[message._] - } else if (delta <= this._floodSleepThreshold) { - await sleep(delta) - delete this._floodWaitedRequests[message._] - } else { - throw new tl.errors.FloodWaitXError(delta / 1000) - } - } - - let lastError: Error | null = null const stack = this._niceStacks ? new Error().stack : undefined - for (let i = 0; i < this._rpcRetryCount; i++) { - try { - // fixme temporary hack - // eslint-disable-next-line dot-notation - const res = await this.network['_primaryDc']!.mainConnection.sendRpc( - message, - stack, - params?.timeout, - ) - await this._cachePeersFrom(res) + const res = await this.network.call(message, params, stack) + await this._cachePeersFrom(res) - return res - } catch (e: any) { - lastError = e - - if (e instanceof tl.errors.InternalError) { - this.log.warn('Telegram is having internal issues: %s', e) - - if (e.message === 'WORKER_BUSY_TOO_LONG_RETRY') { - // according to tdlib, "it is dangerous to resend query without timeout, so use 1" - await sleep(1000) - } - continue - } - - if ( - e.constructor === tl.errors.FloodWaitXError || - e.constructor === tl.errors.SlowmodeWaitXError || - e.constructor === tl.errors.FloodTestPhoneWaitXError - ) { - if (e.constructor !== tl.errors.SlowmodeWaitXError) { - // SLOW_MODE_WAIT is chat-specific, not request-specific - this._floodWaitedRequests[message._] = - Date.now() + e.seconds * 1000 - } - - // In test servers, FLOOD_WAIT_0 has been observed, and sleeping for - // such a short amount will cause retries very fast leading to issues - if (e.seconds === 0) { - (e as any).seconds = 1 - } - - if ( - params?.throwFlood !== true && - e.seconds <= this._floodSleepThreshold - ) { - this.log.info('Flood wait for %d seconds', e.seconds) - await sleep(e.seconds * 1000) - continue - } - } - - // if (connection.params.dc.id === this._defaultDc.id) { - // if ( - // e.constructor === tl.errors.PhoneMigrateXError || - // e.constructor === tl.errors.UserMigrateXError || - // e.constructor === tl.errors.NetworkMigrateXError - // ) { - // this.log.info('Migrate error, new dc = %d', e.new_dc) - // await this.changeDc(e.new_dc) - // continue - // } - // } else { - // if (e.constructor === tl.errors.AuthKeyUnregisteredError) { - // // we can try re-exporting auth from the primary connection - // this.log.warn('exported auth key error, re-exporting..') - // - // const auth = await this.call({ - // _: 'auth.exportAuthorization', - // dcId: connection.params.dc.id, - // }) - // - // await connection.sendRpc({ - // _: 'auth.importAuthorization', - // id: auth.id, - // bytes: auth.bytes, - // }) - // - // continue - // } - // } - - throw e - } - } - - throw lastError + return res } // /** @@ -707,7 +605,9 @@ export class BaseTelegramClient extends EventEmitter { * the connection in which the error has occurred, in case * this was connection-related error. */ - onError(handler: typeof this._onError): void { + onError( + handler: (err: unknown, connection?: SessionConnection) => void, + ): void { this._onError = handler } @@ -816,16 +716,18 @@ export class BaseTelegramClient extends EventEmitter { * > with [@BotFather](//t.me/botfather) */ async exportSession(): Promise { - // todo - // if (!this.primaryConnection.getAuthKey()) - // throw new Error('Auth key is not generated yet') + const primaryDc = await this.storage.getDefaultDc() + if (!primaryDc) throw new Error('No default DC set') + + const authKey = await this.storage.getAuthKeyFor(primaryDc.id) + if (!authKey) throw new Error('Auth key is not ready yet') return writeStringSession(this._writerMap, { version: 1, self: await this.storage.getSelf(), testMode: this._testMode, - primaryDc: this._defaultDc, - authKey: Buffer.from([]), //this.primaryConnection.getAuthKey()!, + primaryDc, + authKey, }) } @@ -837,7 +739,7 @@ export class BaseTelegramClient extends EventEmitter { * * Also note that the session will only be imported in case * the storage is missing authorization (i.e. does not contain - * auth key for the primary DC), otherwise it will be ignored. + * auth key for the primary DC), otherwise it will be ignored (unless `force). * * @param session Session string to import * @param force Whether to overwrite existing session diff --git a/packages/core/src/network/config-manager.ts b/packages/core/src/network/config-manager.ts index 35480e12..8fd520e0 100644 --- a/packages/core/src/network/config-manager.ts +++ b/packages/core/src/network/config-manager.ts @@ -70,12 +70,11 @@ export class ConfigManager { if (this.isStale) await this.update() const options = this._config!.dcOptions.filter((opt) => { - if (opt.id === params.dcId) return true + if (opt.tcpoOnly) return false // unsupported if (opt.ipv6 && !params.allowIpv6) return false if (opt.cdn && !params.cdn) return false - if (opt.tcpoOnly) return false // unsupported - return true + return opt.id === params.dcId }) if (params.preferMedia && params.preferIpv6) { diff --git a/packages/core/src/network/index.ts b/packages/core/src/network/index.ts index 77d7ebd6..34bac818 100644 --- a/packages/core/src/network/index.ts +++ b/packages/core/src/network/index.ts @@ -1,3 +1,4 @@ +export { NetworkManagerExtraParams, RpcCallOptions } from './network-manager' export * from './reconnection' export * from './session-connection' export * from './transports' diff --git a/packages/core/src/network/multi-session-connection.ts b/packages/core/src/network/multi-session-connection.ts index d18e1634..da8ec79b 100644 --- a/packages/core/src/network/multi-session-connection.ts +++ b/packages/core/src/network/multi-session-connection.ts @@ -18,9 +18,11 @@ export class MultiSessionConnection extends EventEmitter { readonly params: SessionConnectionParams, private _count: number, log: Logger, + logPrefix = '', ) { super() this._log = log.create('multi') + if (logPrefix) this._log.prefix = `[${logPrefix}] ` this._enforcePfs = _count > 1 && params.isMainConnection this._sessions = [] @@ -183,9 +185,13 @@ export class MultiSessionConnection extends EventEmitter { } } + _destroyed = false destroy(): void { this._connections.forEach((conn) => conn.destroy()) this._sessions.forEach((sess) => sess.reset()) + this.removeAllListeners() + + this._destroyed = true } private _nextConnection = 0 @@ -222,18 +228,18 @@ export class MultiSessionConnection extends EventEmitter { ].sendRpc(request, stack, timeout) } - async changeDc(dc: tl.RawDcOption, authKey?: Buffer | null): Promise { - await Promise.all( - this._connections.map((conn) => conn.changeDc(dc, authKey)), - ) - } - connect(): void { for (const conn of this._connections) { conn.connect() } } + ensureConnected(): void { + if (this._connections[0].isConnected) return + + this.connect() + } + async setAuthKey( authKey: Buffer | null, temp = false, @@ -244,6 +250,41 @@ export class MultiSessionConnection extends EventEmitter { await key.setup(authKey) } + setInactivityTimeout(timeout?: number): void { + this._log.debug('setting inactivity timeout to %s', timeout) + + // for future connections (if any) + this.params.inactivityTimeout = timeout + + // for current connections + for (const conn of this._connections) { + conn.setInactivityTimeout(timeout) + } + } + + notifyKeyChange(): void { + // only expected to be called on non-main connections + const session = this._sessions[0] + + if (this.params.usePfs && !session._authKeyTemp.ready) { + this._log.debug( + 'temp auth key needed but not ready, ignoring key change', + ) + + return + } + + if (this._sessions[0].queuedRpc.length) { + // there are pending requests, we need to reconnect. + this._log.debug( + 'notifying key change on the connection due to queued rpc', + ) + this._connections[0].onConnected() + } + + // connection is idle, we don't need to notify it + } + requestAuth(): void { this._connections[0]._authorize() } diff --git a/packages/core/src/network/network-manager.ts b/packages/core/src/network/network-manager.ts index 49f3e0a9..4825c8aa 100644 --- a/packages/core/src/network/network-manager.ts +++ b/packages/core/src/network/network-manager.ts @@ -2,7 +2,7 @@ import { tl } from '@mtcute/tl' import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' import { ITelegramStorage } from '../storage' -import { ICryptoProvider, Logger } from '../utils' +import { ICryptoProvider, Logger, sleep } from '../utils' import { ConfigManager } from './config-manager' import { MultiSessionConnection } from './multi-session-connection' import { PersistentConnectionParams } from './persistent-connection' @@ -16,7 +16,7 @@ import { } from './session-connection' import { defaultTransportFactory, TransportFactory } from './transports' -export type ConnectionKind = 'main' | 'upload' | 'download' | 'download-small' +export type ConnectionKind = 'main' | 'upload' | 'download' | 'downloadSmall' /** * Params passed into {@link NetworkManager} by {@link TelegramClient}. @@ -33,15 +33,40 @@ export interface NetworkManagerParams { > transport?: TransportFactory reconnectionStrategy?: ReconnectionStrategy - + floodSleepThreshold: number + maxRetryCount: number disableUpdates?: boolean testMode: boolean layer: number + useIpv6: boolean readerMap: TlReaderMap writerMap: TlWriterMap + isPremium: boolean _emitError: (err: Error, connection?: SessionConnection) => void } +export type ConnectionCountDelegate = ( + kind: ConnectionKind, + dcId: number, + isPremium: boolean +) => number + +const defaultConnectionCountDelegate: ConnectionCountDelegate = ( + kind, + dcId, + isPremium, +) => { + switch (kind) { + case 'main': + return 1 + case 'upload': + return isPremium || (dcId !== 2 && dcId !== 4) ? 8 : 4 + case 'download': + case 'downloadSmall': + return isPremium ? 8 : 2 + } +} + /** * Additional params passed into {@link NetworkManager} by the user * that customize the behavior of the manager @@ -55,8 +80,65 @@ export interface NetworkManagerExtraParams { /** * Connection count for each connection kind + * + * Defaults to TDLib logic: + * - main: 1 (should not be changed manually) + * - upload: if premium or dc id is other than 2 or 4, then 8, otherwise 4 + * - download: if premium then 8, otherwise 2 + * - downloadSmall: if premium then 8, otherwise 2 */ - connectionCount?: Partial> + connectionCount?: ConnectionCountDelegate + + /** + * Idle timeout for non-main connections, in ms + * Defaults to 60 seconds. + */ + inactivityTimeout?: number +} + +export interface RpcCallOptions { + /** + * If the call results in a `FLOOD_WAIT_X` error, + * the maximum amount of time to wait before retrying. + * + * If set to `0`, the call will not be retried. + * + * @default {@link BaseTelegramClientOptions.floodSleepThreshold} + */ + floodSleepThreshold?: number + + /** + * If the call results in an internal server error or a flood wait, + * the maximum amount of times to retry the call. + * + * @default {@link BaseTelegramClientOptions.maxRetryCount} + */ + maxRetryCount?: number + + /** + * Timeout for the call, in milliseconds. + * + * @default Infinity + */ + timeout?: number + + /** + * Kind of connection to use for this call. + * + * @default 'main' + */ + kind?: ConnectionKind + + /** + * ID of the DC to use for this call + */ + dcId?: number + + /** + * DC connection manager to use for this call. + * Overrides `dcId` if set. + */ + manager?: DcConnectionManager } export class DcConnectionManager { @@ -73,35 +155,87 @@ export class DcConnectionManager { writerMap: this.manager.params.writerMap, usePfs: this.manager.params.usePfs, isMainConnection: false, + inactivityTimeout: this.manager.params.inactivityTimeout ?? 60_000, }) - mainConnection = new MultiSessionConnection( - { - ...this.__baseConnectionParams(), - isMainConnection: true, - }, - this.manager.params.connectionCount?.main ?? 1, - this.manager._log, + private _log = this.manager._log.create('dc-manager') + + main: MultiSessionConnection + + upload = new MultiSessionConnection( + this.__baseConnectionParams(), + this.manager._connectionCount( + 'upload', + this._dc.id, + this.manager.params.isPremium, + ), + this._log, + 'UPLOAD', + ) + + download = new MultiSessionConnection( + this.__baseConnectionParams(), + this.manager._connectionCount( + 'download', + this._dc.id, + this.manager.params.isPremium, + ), + this._log, + 'DOWNLOAD', + ) + + downloadSmall = new MultiSessionConnection( + this.__baseConnectionParams(), + this.manager._connectionCount( + 'downloadSmall', + this._dc.id, + this.manager.params.isPremium, + ), + this._log, + 'DOWNLOAD_SMALL', ) constructor( readonly manager: NetworkManager, readonly dcId: number, - private _dc: tl.RawDcOption, + readonly _dc: tl.RawDcOption, + public isPrimary = false, ) { - this._setupMulti(this.mainConnection, 'main') + this._log.prefix = `[DC ${dcId}] ` + + const mainParams = this.__baseConnectionParams() + mainParams.isMainConnection = true + + if (isPrimary) { + mainParams.inactivityTimeout = undefined + } + + this.main = new MultiSessionConnection( + mainParams, + this.manager._connectionCount( + 'main', + this._dc.id, + this.manager.params.isPremium, + ), + this._log, + 'MAIN', + ) + + this._setupMulti('main') + this._setupMulti('upload') + this._setupMulti('download') + this._setupMulti('downloadSmall') } - private _setupMulti( - connection: MultiSessionConnection, - kind: ConnectionKind, - ): void { + private _setupMulti(kind: ConnectionKind): void { + const connection = this[kind] + connection.on('key-change', (idx, key) => { if (kind !== 'main') { // main connection is responsible for authorization, // and keys are then sent to other connections this.manager._log.warn( - 'got key-change from non-main connection', + 'got key-change from non-main connection, ignoring', ) return @@ -115,12 +249,20 @@ export class DcConnectionManager { this.manager._storage.setAuthKeyFor(this.dcId, key) // send key to other connections - // todo + Promise.all([ + this.upload.setAuthKey(key), + this.download.setAuthKey(key), + this.downloadSmall.setAuthKey(key), + ]).then(() => { + this.upload.notifyKeyChange() + this.download.notifyKeyChange() + this.downloadSmall.notifyKeyChange() + }) }) connection.on('tmp-key-change', (idx, key, expires) => { if (kind !== 'main') { this.manager._log.warn( - 'got tmp-key-change from non-main connection', + 'got tmp-key-change from non-main connection, ignoring', ) return @@ -137,6 +279,17 @@ export class DcConnectionManager { key, expires * 1000, ) + + // send key to other connections + Promise.all([ + this.upload.setAuthKey(key, true), + this.download.setAuthKey(key, true), + this.downloadSmall.setAuthKey(key, true), + ]).then(() => { + this.upload.notifyKeyChange() + this.download.notifyKeyChange() + this.downloadSmall.notifyKeyChange() + }) }) connection.on('auth-begin', () => { @@ -144,7 +297,7 @@ export class DcConnectionManager { // to avoid them sending requests before auth is complete if (kind !== 'main') { this.manager._log.warn( - 'got auth-begin from non-main connection', + 'got auth-begin from non-main connection, ignoring', ) return @@ -156,24 +309,58 @@ export class DcConnectionManager { }) connection.on('request-auth', () => { - this.mainConnection.requestAuth() + this.main.requestAuth() }) } - async loadKeys(): Promise { + setIsPrimary(isPrimary: boolean): void { + if (this.isPrimary === isPrimary) return + this.isPrimary = isPrimary + + if (isPrimary) { + this.main.setInactivityTimeout(undefined) + } else { + this.main.setInactivityTimeout( + this.manager.params.inactivityTimeout ?? 60_000, + ) + } + } + + async loadKeys(): Promise { const permanent = await this.manager._storage.getAuthKeyFor(this.dcId) - await this.mainConnection.setAuthKey(permanent) + await Promise.all([ + this.main.setAuthKey(permanent), + this.upload.setAuthKey(permanent), + this.download.setAuthKey(permanent), + this.downloadSmall.setAuthKey(permanent), + ]) + + if (!permanent) { + return false + } if (this.manager.params.usePfs) { - for (let i = 0; i < this.mainConnection._sessions.length; i++) { - const temp = await this.manager._storage.getAuthKeyFor( - this.dcId, - i, - ) - await this.mainConnection.setAuthKey(temp, true, i) - } + await Promise.all( + this.main._sessions.map(async (_, i) => { + const temp = await this.manager._storage.getAuthKeyFor( + this.dcId, + i, + ) + await this.main.setAuthKey(temp, true, i) + + if (i === 0) { + await Promise.all([ + this.upload.setAuthKey(temp, true), + this.download.setAuthKey(temp, true), + this.downloadSmall.setAuthKey(temp, true), + ]) + } + }), + ) } + + return true } } @@ -184,6 +371,7 @@ export class NetworkManager { readonly _initConnectionParams: tl.RawInitConnectionRequest readonly _transportFactory: TransportFactory readonly _reconnectionStrategy: ReconnectionStrategy + readonly _connectionCount: ConnectionCountDelegate protected readonly _dcConnections: Record = {} protected _primaryDc?: DcConnectionManager @@ -252,20 +440,19 @@ export class NetworkManager { this._transportFactory = params.transport ?? defaultTransportFactory this._reconnectionStrategy = params.reconnectionStrategy ?? defaultReconnectionStrategy - - // this._dcConnections[params.defaultDc?.id ?? 2] = - // new DcConnectionManager(this, params.defaultDc?.id ?? 2) + this._connectionCount = + params.connectionCount ?? defaultConnectionCountDelegate } private _switchPrimaryDc(dc: DcConnectionManager) { if (this._primaryDc && this._primaryDc !== dc) { - // todo clean up - return + this._primaryDc.setIsPrimary(false) } this._primaryDc = dc + dc.setIsPrimary(true) - dc.mainConnection.on('usable', () => { + dc.main.on('usable', () => { this._lastUpdateTime = Date.now() if (this._keepAliveInterval) clearInterval(this._keepAliveInterval) @@ -276,19 +463,46 @@ export class NetworkManager { } }, 60_000) }) - dc.mainConnection.on('update', (update) => { + dc.main.on('update', (update) => { this._lastUpdateTime = Date.now() this._updateHandler(update) }) // dc.mainConnection.on('wait', () => // this._cleanupPrimaryConnection() // ) - dc.mainConnection.on('error', (err, conn) => - this.params._emitError(err, conn), - ) + + dc.main.on('error', (err, conn) => this.params._emitError(err, conn)) + dc.loadKeys() .catch((e) => this.params._emitError(e)) - .then(() => dc.mainConnection.connect()) + .then(() => dc.main.ensureConnected()) + } + + async _getOtherDc(dcId: number): Promise { + if (!this._dcConnections[dcId]) { + this._log.debug('creating new DC %d', dcId) + + const dcOption = await this.config.findOption({ + dcId, + allowIpv6: this.params.useIpv6, + preferIpv6: this.params.useIpv6, + preferMedia: dcId !== this._primaryDc?.dcId, + cdn: false, + }) + + if (!dcOption) { + throw new Error(`Could not find DC ${dcId}`) + } + const dc = new DcConnectionManager(this, dcId, dcOption) + + if (!(await dc.loadKeys())) { + dc.main.requestAuth() + } + + this._dcConnections[dcId] = dc + } + + return this._dcConnections[dcId] } /** @@ -296,23 +510,219 @@ export class NetworkManager { * * @param defaultDc Default DC to connect to */ - connect(defaultDc: tl.RawDcOption): void { + async connect(defaultDc: tl.RawDcOption): Promise { if (this._dcConnections[defaultDc.id]) { // shouldn't happen throw new Error('DC manager already exists') } - this._dcConnections[defaultDc.id] = new DcConnectionManager( - this, - defaultDc.id, - defaultDc, + const dc = new DcConnectionManager(this, defaultDc.id, defaultDc) + this._dcConnections[defaultDc.id] = dc + await this._switchPrimaryDc(dc) + } + + private async _exportAuthTo(manager: DcConnectionManager): Promise { + const auth = await this.call({ + _: 'auth.exportAuthorization', + dcId: manager.dcId, + }) + + // manager.ensureMainConnection() + // + // if (!manager.main._sessions[0]._authKey.ready) { + // await manager.loadKeys() + // } + // + // manager.main.ensureConnected() + + const res = await this.call( + { + _: 'auth.importAuthorization', + id: auth.id, + bytes: auth.bytes, + }, + { manager }, ) - this._switchPrimaryDc(this._dcConnections[defaultDc.id]) + + if (res._ !== 'auth.authorization') { + throw new Error( + `Unexpected response from auth.importAuthorization: ${res._}`, + ) + } + } + + async exportAuth(): Promise { + const dcs: Record = {} + const config = await this.config.get() + + for (const dc of config.dcOptions) { + if (dc.cdn) continue + dcs[dc.id] = dc.id + } + + for (const dc of Object.values(dcs)) { + if (dc === this._primaryDc!.dcId) continue + this._log.debug('exporting auth for dc %d', dc) + + const manager = await this._getOtherDc(dc) + await this._exportAuthTo(manager) + } + } + + async changePrimaryDc(newDc: number): Promise { + if (newDc === this._primaryDc?.dcId) return + + const option = await this.config.findOption({ + dcId: newDc, + allowIpv6: this.params.useIpv6, + preferIpv6: this.params.useIpv6, + cdn: false, + }) + + if (!option) { + throw new Error(`DC ${newDc} not found`) + } + + if (!this._dcConnections[newDc]) { + this._dcConnections[newDc] = new DcConnectionManager( + this, + newDc, + option, + ) + } + + this._storage.setDefaultDc(option) + + this._switchPrimaryDc(this._dcConnections[newDc]) + } + + private _floodWaitedRequests: Record = {} + async call( + message: T, + params?: RpcCallOptions, + stack?: string, + ): Promise { + if (!this._primaryDc) { + throw new Error('Not connected to any DC') + } + + const floodSleepThreshold = + params?.floodSleepThreshold ?? this.params.floodSleepThreshold + const maxRetryCount = params?.maxRetryCount ?? this.params.maxRetryCount + + // do not send requests that are in flood wait + if (message._ in this._floodWaitedRequests) { + const delta = this._floodWaitedRequests[message._] - Date.now() + + if (delta <= 3000) { + // flood waits below 3 seconds are "ignored" + delete this._floodWaitedRequests[message._] + } else if (delta <= this.params.floodSleepThreshold) { + await sleep(delta) + delete this._floodWaitedRequests[message._] + } else { + throw new tl.errors.FloodWaitXError(delta / 1000) + } + } + + let lastError: Error | null = null + + const kind = params?.kind ?? 'main' + let manager: DcConnectionManager + + if (params?.manager) { + manager = params.manager + } else if (params?.dcId && params.dcId !== this._primaryDc.dcId) { + manager = await this._getOtherDc(params.dcId) + } else { + manager = this._primaryDc + } + + let multi = manager[kind] + + for (let i = 0; i < maxRetryCount; i++) { + try { + const res = await multi.sendRpc(message, stack, params?.timeout) + + if (kind === 'main') { + this._lastUpdateTime = Date.now() + } + + return res + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } catch (e: any) { + lastError = e + + if (e instanceof tl.errors.InternalError) { + this._log.warn('Telegram is having internal issues: %s', e) + + if (e.message === 'WORKER_BUSY_TOO_LONG_RETRY') { + // according to tdlib, "it is dangerous to resend query without timeout, so use 1" + await sleep(1000) + } + continue + } + + if ( + e.constructor === tl.errors.FloodWaitXError || + e.constructor === tl.errors.SlowmodeWaitXError || + e.constructor === tl.errors.FloodTestPhoneWaitXError + ) { + if (e.constructor !== tl.errors.SlowmodeWaitXError) { + // SLOW_MODE_WAIT is chat-specific, not request-specific + this._floodWaitedRequests[message._] = + Date.now() + e.seconds * 1000 + } + + // In test servers, FLOOD_WAIT_0 has been observed, and sleeping for + // such a short amount will cause retries very fast leading to issues + if (e.seconds === 0) { + (e as tl.Mutable).seconds = 1 + } + + if (e.seconds <= floodSleepThreshold) { + this._log.info('Flood wait for %d seconds', e.seconds) + await sleep(e.seconds * 1000) + continue + } + } + + if (manager === this._primaryDc) { + if ( + e.constructor === tl.errors.PhoneMigrateXError || + e.constructor === tl.errors.UserMigrateXError || + e.constructor === tl.errors.NetworkMigrateXError + ) { + this._log.info('Migrate error, new dc = %d', e.new_dc) + + await this.changePrimaryDc(e.new_dc) + manager = this._primaryDc! + multi = manager[kind] + + continue + } + } else if ( + e.constructor === tl.errors.AuthKeyUnregisteredError + ) { + // we can try re-exporting auth from the primary connection + this._log.warn( + 'exported auth key error, trying re-exporting..', + ) + + await this._exportAuthTo(manager) + continue + } + + throw e + } + } + + throw lastError } destroy(): void { for (const dc of Object.values(this._dcConnections)) { - dc.mainConnection.destroy() + dc.main.destroy() } if (this._keepAliveInterval) clearInterval(this._keepAliveInterval) } diff --git a/packages/core/src/network/persistent-connection.ts b/packages/core/src/network/persistent-connection.ts index 4ca54c94..cc31261d 100644 --- a/packages/core/src/network/persistent-connection.ts +++ b/packages/core/src/network/persistent-connection.ts @@ -2,10 +2,7 @@ import EventEmitter from 'events' import { tl } from '@mtcute/tl' -import { - ICryptoProvider, - Logger, -} from '../utils' +import { ICryptoProvider, Logger } from '../utils' import { ReconnectionStrategy } from './reconnection' import { ITelegramTransport, @@ -45,7 +42,7 @@ export abstract class PersistentConnection extends EventEmitter { // inactivity timeout private _inactivityTimeout: NodeJS.Timeout | null = null - private _inactive = false + private _inactive = true _destroyed = false _usable = false @@ -63,19 +60,14 @@ export abstract class PersistentConnection extends EventEmitter { super() this.params = params this.changeTransport(params.transportFactory) - this._updateLogPrefix() + + this.log.prefix = `[UID ${this._uid}] ` + + this._onInactivityTimeout = this._onInactivityTimeout.bind(this) } - private _updateLogPrefix() { - this.log.prefix = `[UID ${this._uid}, DC ${this.params.dc.id}] ` - } - - async changeDc(dc: tl.RawDcOption): Promise { - this.log.debug('dc changed to: %j', dc) - - this.params.dc = dc - this._updateLogPrefix() - this.reconnect() + get isConnected(): boolean { + return this._transport.state() !== TransportState.Idle } changeTransport(factory: TransportFactory): void { @@ -139,7 +131,9 @@ export abstract class PersistentConnection extends EventEmitter { this._previousWait = wait - if (this._reconnectionTimeout != null) { clearTimeout(this._reconnectionTimeout) } + if (this._reconnectionTimeout != null) { + clearTimeout(this._reconnectionTimeout) + } this._reconnectionTimeout = setTimeout(() => { if (this._destroyed) return this._reconnectionTimeout = null @@ -148,10 +142,14 @@ export abstract class PersistentConnection extends EventEmitter { } connect(): void { - if (this._transport.state() !== TransportState.Idle) { throw new Error('Connection is already opened!') } + if (this.isConnected) { + throw new Error('Connection is already opened!') + } if (this._destroyed) throw new Error('Connection is already destroyed!') - if (this._reconnectionTimeout != null) { clearTimeout(this._reconnectionTimeout) } + if (this._reconnectionTimeout != null) { + clearTimeout(this._reconnectionTimeout) + } this._inactive = false this._transport.connect(this.params.dc, this.params.testMode) @@ -162,8 +160,12 @@ export abstract class PersistentConnection extends EventEmitter { } destroy(): void { - if (this._reconnectionTimeout != null) { clearTimeout(this._reconnectionTimeout) } - if (this._inactivityTimeout != null) { clearTimeout(this._inactivityTimeout) } + if (this._reconnectionTimeout != null) { + clearTimeout(this._reconnectionTimeout) + } + if (this._inactivityTimeout != null) { + clearTimeout(this._inactivityTimeout) + } this._transport.close() this._transport.removeAllListeners() @@ -173,15 +175,32 @@ export abstract class PersistentConnection extends EventEmitter { protected _rescheduleInactivity(): void { if (!this.params.inactivityTimeout) return if (this._inactivityTimeout) clearTimeout(this._inactivityTimeout) - this._inactivityTimeout = setTimeout(() => { - this.log.info( - 'disconnected because of inactivity for %d', - this.params.inactivityTimeout, - ) - this._inactive = true - this._inactivityTimeout = null - this._transport.close() - }, this.params.inactivityTimeout) + this._inactivityTimeout = setTimeout( + this._onInactivityTimeout, + this.params.inactivityTimeout, + ) + } + + protected _onInactivityTimeout(): void { + this.log.info( + 'disconnected because of inactivity for %d', + this.params.inactivityTimeout, + ) + this._inactive = true + this._inactivityTimeout = null + this._transport.close() + } + + setInactivityTimeout(timeout?: number): void { + this.params.inactivityTimeout = timeout + + if (this._inactivityTimeout) { + clearTimeout(this._inactivityTimeout) + } + + if (timeout) { + this._rescheduleInactivity() + } } async send(data: Buffer): Promise { diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index 6096088e..c17fefcf 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -102,12 +102,6 @@ export class SessionConnection extends PersistentConnection { this._handleRawMessage = this._handleRawMessage.bind(this) } - async changeDc(dc: tl.RawDcOption, authKey?: Buffer | null): Promise { - this._session.reset() - await this._session._authKey.setup(authKey) - await super.changeDc(dc) - } - getAuthKey(temp = false): Buffer | null { const key = temp ? this._session._authKeyTemp : this._session._authKey @@ -135,10 +129,12 @@ export class SessionConnection extends PersistentConnection { onTransportClose(): void { super.onTransportClose() - Object.values(this._pendingWaitForUnencrypted).forEach(([prom, timeout]) => { - prom.reject(new Error('Connection closed')) - clearTimeout(timeout) - }) + Object.values(this._pendingWaitForUnencrypted).forEach( + ([prom, timeout]) => { + prom.reject(new Error('Connection closed')) + clearTimeout(timeout) + }, + ) this.emit('disconnect') @@ -174,7 +170,9 @@ export class SessionConnection extends PersistentConnection { this.log.info('no perm auth key, authorizing...') this._authorize() - // todo: if we use pfs, we can also start temp key exchange here + // if we use pfs, we *could* also start temp key exchange here + // but telegram restricts us to only have one auth session per connection, + // and having a separate connection for pfs is not worth it return } @@ -217,7 +215,9 @@ export class SessionConnection extends PersistentConnection { return } else if (this._isPfsBindingPending) { - this.log.info('transport error 404, pfs binding in progress') + this.log.info( + 'transport error 404, pfs binding in progress', + ) this._onAllFailed('temp key expired, binding pending') @@ -338,7 +338,9 @@ export class SessionConnection extends PersistentConnection { doAuthorization(this, this.params.crypto, TEMP_AUTH_KEY_EXPIRY) .then(async ([tempAuthKey, tempServerSalt]) => { if (!this._usePfs) { - this.log.info('pfs has been disabled while generating temp key') + this.log.info( + 'pfs has been disabled while generating temp key', + ) return } @@ -397,7 +399,9 @@ export class SessionConnection extends PersistentConnection { encryptedData, ]) - const promise = createControllablePromise() + const promise = createControllablePromise< + mtp.RawMt_rpc_error | boolean + >() // encrypt the message using temp key and same msg id // this is a bit of a hack, but it works @@ -449,7 +453,9 @@ export class SessionConnection extends PersistentConnection { this._session.pendingMessages.delete(msgId) if (!this._usePfs) { - this.log.info('pfs has been disabled while binding temp key') + this.log.info( + 'pfs has been disabled while binding temp key', + ) return } @@ -526,7 +532,8 @@ export class SessionConnection extends PersistentConnection { // auth_key_id = 0, meaning it's an unencrypted message used for authorization if (this._pendingWaitForUnencrypted.length) { - const [promise, timeout] = this._pendingWaitForUnencrypted.shift()! + const [promise, timeout] = + this._pendingWaitForUnencrypted.shift()! clearTimeout(timeout) promise.resolve(data) } else { @@ -583,23 +590,17 @@ export class SessionConnection extends PersistentConnection { for (let i = 0; i < count; i++) { // msg_id:long seqno:int bytes:int const msgId = message.long() - message.uint() // seqno + const seqNo = message.uint() // seqno const length = message.uint() - // container can't contain other containers, so we are safe - const start = message.pos - const obj = message.object() + // container can't contain other containers, but can contain rpc_result + const obj = message.raw(length) - // ensure length - if (message.pos - start !== length) { - this.log.warn( - 'received message with invalid length in container (%d != %d)', - message.pos - start, - length, - ) - } - - this._handleMessage(msgId, obj) + this._handleRawMessage( + msgId, + seqNo, + new TlBinaryReader(this._readerMap, obj), + ) } return @@ -609,6 +610,8 @@ export class SessionConnection extends PersistentConnection { // rpc_result message.uint() + this._sendAck(messageId) + return this._onRpcResult(message) } @@ -743,7 +746,7 @@ export class SessionConnection extends PersistentConnection { resultType = message.peekUint() } this.log.warn( - 'received rpc_result with %s with req_msg_id = 0', + 'received rpc_result with %j with req_msg_id = 0', resultType, ) @@ -760,15 +763,11 @@ export class SessionConnection extends PersistentConnection { } catch (err) { result = '[failed to parse]' } - this.log.warn( - 'received rpc_result with %s with req_msg_id = 0', - result, - ) // check if the msg is one of the recent ones if (this._session.recentOutgoingMsgIds.has(reqMsgId)) { this.log.debug( - 'received rpc_result again for %l (contains %s)', + 'received rpc_result again for %l (contains %j)', reqMsgId, result, ) @@ -786,7 +785,7 @@ export class SessionConnection extends PersistentConnection { // special case for auth key binding if (msg._ !== 'rpc') { if (msg._ === 'bind') { - msg.promise.resolve(result) + msg.promise.resolve(message.object()) return } @@ -1387,7 +1386,9 @@ export class SessionConnection extends PersistentConnection { } private _enqueueRpc(rpc: PendingRpc, force?: boolean) { - if (this._session.enqueueRpc(rpc, force)) { this._flushTimer.emitWhenIdle() } + if (this._session.enqueueRpc(rpc, force)) { + this._flushTimer.emitWhenIdle() + } } _resetSession(): void { @@ -1575,18 +1576,57 @@ export class SessionConnection extends PersistentConnection { } } - private _flush(): void { + private get _hasPendingServiceMessages(): boolean { + return Boolean( + this._session.queuedRpc.length || + this._session.queuedAcks.length || + this._session.queuedStateReq.length || + this._session.queuedResendReq.length, + ) + } + + protected _onInactivityTimeout() { + // we should send all pending acks and other service messages + // before dropping the connection + + if (!this._hasPendingServiceMessages) { + this.log.debug('no pending service messages, closing connection') + super._onInactivityTimeout() + + return + } + + this._flush(() => { + if (this._hasPendingServiceMessages) { + // the callback will be called again once all pending messages are sent + return + } + + this.log.debug('pending service messages sent, closing connection') + this._flushTimer.reset() + super._onInactivityTimeout() + }) + } + + private _flush(callback?: () => void): void { if ( !this._session._authKey.ready || this._isPfsBindingPending || this._current429Timeout ) { + this.log.debug( + 'skipping flush, connection is not usable (auth key ready = %b, pfs binding pending = %b, 429 timeout = %b)', + this._session._authKey.ready, + this._isPfsBindingPending, + Boolean(this._current429Timeout), + ) + // it will be flushed once connection is usable return } try { - this._doFlush() + this._doFlush(callback) } catch (e: any) { this.log.error('flush error: %s', e.stack) // should not happen unless there's a bug in the code @@ -1601,13 +1641,13 @@ export class SessionConnection extends PersistentConnection { this._session.queuedStateReq.length || this._session.queuedResendReq.length ) { - this._flush() + this._flush(callback) } else { this._flushTimer.emitBefore(this._lastPingTime + 60000) } } - private _doFlush(): void { + private _doFlush(callback?: () => void): void { this.log.debug( 'flushing send queue. queued rpc: %d', this._session.queuedRpc.length, @@ -2015,6 +2055,7 @@ export class SessionConnection extends PersistentConnection { this._session .encryptMessage(result) .then((enc) => this.send(enc)) + .then(callback) .catch((err) => { this.log.error( 'error while sending pending messages (root msg_id = %l): %s', @@ -2023,7 +2064,9 @@ export class SessionConnection extends PersistentConnection { ) // put acks in the front so they are the first to be sent - if (ackMsgIds) { this._session.queuedAcks.splice(0, 0, ...ackMsgIds) } + if (ackMsgIds) { + this._session.queuedAcks.splice(0, 0, ...ackMsgIds) + } this._onMessageFailed(rootMsgId, 'unknown error') }) } diff --git a/packages/core/src/utils/platform/logging.ts b/packages/core/src/utils/platform/logging.ts index e8c09f93..bfac91a5 100644 --- a/packages/core/src/utils/platform/logging.ts +++ b/packages/core/src/utils/platform/logging.ts @@ -2,7 +2,7 @@ import { isatty } from 'tty' const isTty = isatty(process.stdout.fd) -const BASE_FORMAT = isTty ? '[%s] [%s] %s%s\x1b[0m - ' : '[%s] [%s] %s - ' +const BASE_FORMAT = isTty ? '%s [%s] [%s%s\x1b[0m] ' : '%s [%s] [%s] ' const LEVEL_NAMES = isTty ? [ '', // OFF diff --git a/packages/core/src/utils/platform/logging.web.ts b/packages/core/src/utils/platform/logging.web.ts index 0da120e4..a257c30a 100644 --- a/packages/core/src/utils/platform/logging.web.ts +++ b/packages/core/src/utils/platform/logging.web.ts @@ -1,4 +1,4 @@ -const BASE_FORMAT = '[%s] [%с%s%с] %c%s%c - ' +const BASE_FORMAT = '%s [%с%s%с] [%c%s%c] ' const LEVEL_NAMES = [ '', // OFF 'ERR',