build: fixes after rebase

This commit is contained in:
alina 🌸 2023-06-10 00:37:26 +03:00
parent 0b4edbf70e
commit eb585fc3f1
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
23 changed files with 301 additions and 337 deletions

View file

@ -47,8 +47,10 @@ export async function checkPassword(
await this._saveStorage() await this._saveStorage()
// telegram ignores invokeWithoutUpdates for auth methods // telegram ignores invokeWithoutUpdates for auth methods
if (this._disableUpdates) this.primaryConnection._resetSession() // todo where is this._disableUpdates?
else this.startUpdatesLoop() // if (this._disableUpdates) this.primaryConnection._resetSession()
// else
this.startUpdatesLoop()
return new User(this, res.user) return new User(this, res.user)
} }

View file

@ -19,7 +19,7 @@ export async function sendCode(
const res = await this.call({ const res = await this.call({
_: 'auth.sendCode', _: 'auth.sendCode',
phoneNumber: phone, phoneNumber: phone,
apiId: this._initConnectionParams.apiId, apiId: this.network._initConnectionParams.apiId,
apiHash: this._apiHash, apiHash: this._apiHash,
settings: { _: 'codeSettings' }, settings: { _: 'codeSettings' },
}) })

View file

@ -17,7 +17,7 @@ export async function signInBot(
const res = await this.call({ const res = await this.call({
_: 'auth.importBotAuthorization', _: 'auth.importBotAuthorization',
flags: 0, flags: 0,
apiId: this._initConnectionParams.apiId, apiId: this.network._initConnectionParams.apiId,
apiHash: this._apiHash, apiHash: this._apiHash,
botAuthToken: token, botAuthToken: token,
}) })
@ -42,8 +42,10 @@ export async function signInBot(
await this._saveStorage() await this._saveStorage()
// telegram ignores invokeWithoutUpdates for auth methods // telegram ignores invokeWithoutUpdates for auth methods
if (this._disableUpdates) this.primaryConnection._resetSession() // todo where is this._disableUpdates?
else this.startUpdatesLoop() // if (this._disableUpdates) this.primaryConnection._resetSession()
// else
this.startUpdatesLoop()
return new User(this, res.user) return new User(this, res.user)
} }

View file

@ -50,8 +50,10 @@ export async function signIn(
await this._saveStorage() await this._saveStorage()
// telegram ignores invokeWithoutUpdates for auth methods // telegram ignores invokeWithoutUpdates for auth methods
if (this._disableUpdates) this.primaryConnection._resetSession() // todo where is this._disableUpdates?
else this.startUpdatesLoop() // if (this._disableUpdates) this.primaryConnection._resetSession()
// else
this.startUpdatesLoop()
return new User(this, res.user) return new User(this, res.user)
} }

View file

@ -40,8 +40,10 @@ export async function signUp(
await this._saveStorage() await this._saveStorage()
// telegram ignores invokeWithoutUpdates for auth methods // telegram ignores invokeWithoutUpdates for auth methods
if (this._disableUpdates) this.primaryConnection._resetSession() // todo where is this._disableUpdates?
else this.startUpdatesLoop() // if (this._disableUpdates) this.primaryConnection._resetSession()
// else
this.startUpdatesLoop()
return new User(this, res.user) return new User(this, res.user)
} }

View file

@ -79,6 +79,7 @@ export async function startTest(
if (!availableDcs.find((dc) => dc.id === id)) { throw new MtArgumentError(`${phone} has invalid DC ID (${id})`) } if (!availableDcs.find((dc) => dc.id === id)) { throw new MtArgumentError(`${phone} has invalid DC ID (${id})`) }
} else { } else {
let dcId = this._defaultDc.id let dcId = this._defaultDc.id
if (params.dcId) { if (params.dcId) {
if (!availableDcs.find((dc) => dc.id === params!.dcId)) { throw new MtArgumentError(`DC ID is invalid (${dcId})`) } if (!availableDcs.find((dc) => dc.id === params!.dcId)) { throw new MtArgumentError(`DC ID is invalid (${dcId})`) }
dcId = params.dcId dcId = params.dcId

View file

@ -155,7 +155,8 @@ export async function start(
me.isBot, me.isBot,
) )
if (!this._disableUpdates) { // todo where is this._disableUpdates?
if (!false) {
this._catchUpChannels = Boolean(params.catchUp) this._catchUpChannels = Boolean(params.catchUp)
if (!params.catchUp) { if (!params.catchUp) {

View file

@ -10,7 +10,6 @@ import {
FileDownloadParameters, FileDownloadParameters,
FileLocation, FileLocation,
MtArgumentError, MtArgumentError,
MtUnsupportedError,
} from '../../types' } from '../../types'
import { determinePartSize } from '../../utils/file-utils' import { determinePartSize } from '../../utils/file-utils'
@ -36,7 +35,7 @@ export async function* downloadAsIterable(
) )
} }
let offset = params.offset ?? 0 const offset = params.offset ?? 0
if (offset % 4096 !== 0) { if (offset % 4096 !== 0) {
throw new MtArgumentError( throw new MtArgumentError(
@ -80,7 +79,7 @@ export async function* downloadAsIterable(
const chunkSize = partSizeKb * 1024 const chunkSize = partSizeKb * 1024
let limit = const limit =
params.limit ?? params.limit ??
// derive limit from chunk size, file size and offset // derive limit from chunk size, file size and offset
(fileSize ? (fileSize ?
@ -88,78 +87,81 @@ export async function* downloadAsIterable(
// we will receive an error when we have reached the end anyway // we will receive an error when we have reached the end anyway
Infinity) Infinity)
let connection = this._downloadConnections[dcId] // fixme
throw new Error('TODO')
if (!connection) { // let connection = this._downloadConnections[dcId]
connection = await this.createAdditionalConnection(dcId)
this._downloadConnections[dcId] = connection
}
const requestCurrent = async (): Promise<Buffer> => { // if (!connection) {
let result: // connection = await this.createAdditionalConnection(dcId)
| tl.RpcCallReturn['upload.getFile'] // this._downloadConnections[dcId] = connection
| tl.RpcCallReturn['upload.getWebFile'] // }
//
try { // const requestCurrent = async (): Promise<Buffer> => {
result = await this.call( // let result:
{ // | tl.RpcCallReturn['upload.getFile']
_: isWeb ? 'upload.getWebFile' : 'upload.getFile', // | tl.RpcCallReturn['upload.getWebFile']
// eslint-disable-next-line @typescript-eslint/no-explicit-any //
location: location as any, // try {
offset, // result = await this.call(
limit: chunkSize, // {
}, // _: isWeb ? 'upload.getWebFile' : 'upload.getFile',
{ connection }, // // eslint-disable-next-line @typescript-eslint/no-explicit-any
) // location: location as any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any // offset,
} catch (e: any) { // limit: chunkSize,
if (e.constructor === tl.errors.FileMigrateXError) { // },
connection = this._downloadConnections[e.new_dc] // { connection },
// )
if (!connection) { // // eslint-disable-next-line @typescript-eslint/no-explicit-any
connection = await this.createAdditionalConnection(e.new_dc) // } catch (e: any) {
this._downloadConnections[e.new_dc] = connection // if (e.constructor === tl.errors.FileMigrateXError) {
} // connection = this._downloadConnections[e.new_dc]
//
return requestCurrent() // if (!connection) {
} else if (e.constructor === tl.errors.FilerefUpgradeNeededError) { // connection = await this.createAdditionalConnection(e.new_dc)
// todo: implement someday // this._downloadConnections[e.new_dc] = connection
// see: https://github.com/LonamiWebs/Telethon/blob/0e8bd8248cc649637b7c392616887c50986427a0/telethon/client/downloads.py#L99 // }
throw new MtUnsupportedError('File ref expired!') //
} else throw e // return requestCurrent()
} // } else if (e.constructor === tl.errors.FilerefUpgradeNeededError) {
// // todo: implement someday
if (result._ === 'upload.fileCdnRedirect') { // // see: https://github.com/LonamiWebs/Telethon/blob/0e8bd8248cc649637b7c392616887c50986427a0/telethon/client/downloads.py#L99
// we shouldnt receive them since cdnSupported is not set in the getFile request. // throw new MtUnsupportedError('File ref expired!')
// also, i couldnt find any media that would be downloaded from cdn, so even if // } else throw e
// i implemented that, i wouldnt be able to test that, so :shrug: // }
throw new MtUnsupportedError( //
'Received CDN redirect, which is not supported (yet)', // if (result._ === 'upload.fileCdnRedirect') {
) // // we shouldnt receive them since cdnSupported is not set in the getFile request.
} // // also, i couldnt find any media that would be downloaded from cdn, so even if
// // i implemented that, i wouldnt be able to test that, so :shrug:
if ( // throw new MtUnsupportedError(
result._ === 'upload.webFile' && // 'Received CDN redirect, which is not supported (yet)',
result.size && // )
limit === Infinity // }
) { //
limit = result.size // if (
} // result._ === 'upload.webFile' &&
// result.size &&
return result.bytes // limit === Infinity
} // ) {
// limit = result.size
for (let i = 0; i < limit; i++) { // }
const buf = await requestCurrent() //
// return result.bytes
if (buf.length === 0) { // }
// we've reached the end //
return // for (let i = 0; i < limit; i++) {
} // const buf = await requestCurrent()
//
yield buf // if (buf.length === 0) {
offset += chunkSize // // we've reached the end
// return
params.progressCallback?.(offset, limit) // }
} //
// yield buf
// offset += chunkSize
//
// params.progressCallback?.(offset, limit)
// }
} }

View file

@ -23,15 +23,17 @@ export async function _normalizeInline(
id = parseInlineMessageId(id) id = parseInlineMessageId(id)
} }
let connection = this.primaryConnection // let connection = this.primaryConnection
//
if (id.dcId !== connection.params.dc.id) { // if (id.dcId !== connection.params.dc.id) {
if (!(id.dcId in this._connectionsForInline)) { // if (!(id.dcId in this._connectionsForInline)) {
this._connectionsForInline[id.dcId] = // this._connectionsForInline[id.dcId] =
await this.createAdditionalConnection(id.dcId) // await this.createAdditionalConnection(id.dcId)
} // }
connection = this._connectionsForInline[id.dcId] // connection = this._connectionsForInline[id.dcId]
} // }
//
return [id, connection] // return [id, connection]
// fixme
throw new Error('TODO')
} }

View file

@ -1211,15 +1211,16 @@ async function _onUpdate(
// we just needed to apply new pts values // we just needed to apply new pts values
return return
case 'updateDcOptions': case 'updateDcOptions':
if (!this._config) { // fixme - forward to ConfigManager
this._config = await this.call({ _: 'help.getConfig' }) // if (!this._config) {
} else { // this._config = await this.call({ _: 'help.getConfig' })
(this._config as tl.Mutable<tl.TypeConfig>).dcOptions = // } else {
upd.dcOptions // (this._config as tl.Mutable<tl.TypeConfig>).dcOptions =
} // upd.dcOptions
// }
break break
case 'updateConfig': case 'updateConfig':
this._config = await this.call({ _: 'help.getConfig' }) // this._config = await this.call({ _: 'help.getConfig' })
break break
case 'updateUserName': case 'updateUserName':
if (upd.userId === this._userId) { if (upd.userId === this._userId) {

View file

@ -7,16 +7,13 @@ import defaultReaderMap from '@mtcute/tl/binary/reader'
import defaultWriterMap from '@mtcute/tl/binary/writer' import defaultWriterMap from '@mtcute/tl/binary/writer'
import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
import defaultReaderMap from '@mtcute/tl/binary/reader'
import defaultWriterMap from '@mtcute/tl/binary/writer'
import { import {
defaultReconnectionStrategy,
defaultTransportFactory,
ReconnectionStrategy, ReconnectionStrategy,
SessionConnection, SessionConnection,
TransportFactory, TransportFactory,
} from './network' } from './network'
import { ConfigManager } from './network/config-manager'
import { NetworkManager, NetworkManagerExtraParams } from './network/network-manager'
import { PersistentConnectionParams } from './network/persistent-connection' import { PersistentConnectionParams } from './network/persistent-connection'
import { ITelegramStorage, MemoryStorage } from './storage' import { ITelegramStorage, MemoryStorage } from './storage'
import { MustEqual } from './types' import { MustEqual } from './types'
@ -25,10 +22,6 @@ import {
createControllablePromise, createControllablePromise,
CryptoProviderFactory, CryptoProviderFactory,
defaultCryptoProviderFactory, defaultCryptoProviderFactory,
sleep,
getAllPeersFrom,
LogManager,
toggleChannelIdMark,
defaultProductionDc, defaultProductionDc,
defaultProductionIpv6Dc, defaultProductionIpv6Dc,
defaultTestDc, defaultTestDc,
@ -36,28 +29,11 @@ import {
getAllPeersFrom, getAllPeersFrom,
ICryptoProvider, ICryptoProvider,
LogManager, LogManager,
readStringSession,
sleep, sleep,
toggleChannelIdMark, toggleChannelIdMark,
ControllablePromise, writeStringSession,
createControllablePromise,
readStringSession,
writeStringSession
} from './utils' } from './utils'
import { addPublicKey } from './utils/crypto/keys'
import { readStringSession, writeStringSession } from './utils/string-session'
import {
TransportFactory,
defaultReconnectionStrategy,
ReconnectionStrategy,
defaultTransportFactory,
SessionConnection,
} from './network'
import { PersistentConnectionParams } from './network/persistent-connection'
import { ITelegramStorage, MemoryStorage } from './storage'
import { ConfigManager } from './network/config-manager'
import { NetworkManager, NetworkManagerExtraParams } from "./network/network-manager";
export interface BaseTelegramClientOptions { export interface BaseTelegramClientOptions {
/** /**
@ -184,7 +160,7 @@ export interface BaseTelegramClientOptions {
/** /**
* **EXPERT USE ONLY!** * **EXPERT USE ONLY!**
* *
* Override TL layer used for the connection. /;' * Override TL layer used for the connection.
* *
* **Does not** change the schema used. * **Does not** change the schema used.
*/ */
@ -207,42 +183,42 @@ export interface BaseTelegramClientOptions {
export class BaseTelegramClient extends EventEmitter { export class BaseTelegramClient extends EventEmitter {
/** /**
* Crypto provider taken from {@link BaseTelegramClient.Options.crypto} * Crypto provider taken from {@link BaseTelegramClientOptions.crypto}
*/ */
protected readonly _crypto: ICryptoProvider protected readonly _crypto: ICryptoProvider
/** /**
* Telegram storage taken from {@link BaseTelegramClient.Options.storage} * Telegram storage taken from {@link BaseTelegramClientOptions.storage}
*/ */
readonly storage: ITelegramStorage readonly storage: ITelegramStorage
/** /**
* API hash taken from {@link BaseTelegramClient.Options.apiHash} * API hash taken from {@link BaseTelegramClientOptions.apiHash}
*/ */
protected readonly _apiHash: string protected readonly _apiHash: string
/** /**
* "Use IPv6" taken from {@link BaseTelegramClient.Options.useIpv6} * "Use IPv6" taken from {@link BaseTelegramClientOptions.useIpv6}
*/ */
protected readonly _useIpv6: boolean protected readonly _useIpv6: boolean
/** /**
* "Test mode" taken from {@link BaseTelegramClient.Options.testMode} * "Test mode" taken from {@link BaseTelegramClientOptions.testMode}
*/ */
protected readonly _testMode: boolean protected readonly _testMode: boolean
/** /**
* Flood sleep threshold taken from {@link BaseTelegramClient.Options.floodSleepThreshold} * Flood sleep threshold taken from {@link BaseTelegramClientOptions.floodSleepThreshold}
*/ */
protected readonly _floodSleepThreshold: number protected readonly _floodSleepThreshold: number
/** /**
* RPC retry count taken from {@link BaseTelegramClient.Options.rpcRetryCount} * RPC retry count taken from {@link BaseTelegramClientOptions.rpcRetryCount}
*/ */
protected readonly _rpcRetryCount: number protected readonly _rpcRetryCount: number
/** /**
* Primary DC taken from {@link BaseTelegramClient.Options.defaultDc}, * Primary DC taken from {@link BaseTelegramClientOptions.defaultDc},
* loaded from session or changed by other means (like redirecting). * loaded from session or changed by other means (like redirecting).
*/ */
protected _defaultDc: tl.RawDcOption protected _defaultDc: tl.RawDcOption
@ -256,7 +232,7 @@ export class BaseTelegramClient extends EventEmitter {
private _floodWaitedRequests: Record<string, number> = {} private _floodWaitedRequests: Record<string, number> = {}
protected _config = new ConfigManager(() => protected _config = new ConfigManager(() =>
this.call({ _: 'help.getConfig' }) this.call({ _: 'help.getConfig' }),
) )
private _additionalConnections: SessionConnection[] = [] private _additionalConnections: SessionConnection[] = []
@ -311,8 +287,6 @@ export class BaseTelegramClient extends EventEmitter {
} }
this._defaultDc = dc this._defaultDc = dc
this._reconnectionStrategy =
opts.reconnectionStrategy ?? defaultReconnectionStrategy
this._floodSleepThreshold = opts.floodSleepThreshold ?? 10000 this._floodSleepThreshold = opts.floodSleepThreshold ?? 10000
this._rpcRetryCount = opts.rpcRetryCount ?? 5 this._rpcRetryCount = opts.rpcRetryCount ?? 5
this._niceStacks = opts.niceStacks ?? true this._niceStacks = opts.niceStacks ?? true
@ -350,72 +324,6 @@ export class BaseTelegramClient extends EventEmitter {
await this.storage.save?.() await this.storage.save?.()
} }
protected _keepAliveAction(): void {
if (this._disableUpdates) return
// telegram asks to fetch pending updates
// if there are no updates for 15 minutes.
// core does not have update handling,
// so we just use getState so the server knows
// we still do need updates
this.call({ _: 'updates.getState' }).catch((e) => {
if (!(e instanceof tl.errors.RpcError)) {
this.primaryConnection.reconnect()
}
})
}
private _cleanupPrimaryConnection(forever = false): void {
if (forever && this.primaryConnection) this.primaryConnection.destroy()
if (this._keepAliveInterval) clearInterval(this._keepAliveInterval)
}
private _setupPrimaryConnection(): void {
this._cleanupPrimaryConnection(true)
this.primaryConnection = new SessionConnection(
{
crypto: this._crypto,
initConnection: this._initConnectionParams,
transportFactory: this._transportFactory,
dc: this._primaryDc,
testMode: this._testMode,
reconnectionStrategy: this._reconnectionStrategy,
layer: this._layer,
disableUpdates: this._disableUpdates,
readerMap: this._readerMap,
writerMap: this._writerMap,
},
this.log.create('connection'),
)
this.primaryConnection.on('usable', () => {
this._lastUpdateTime = Date.now()
if (this._keepAliveInterval) clearInterval(this._keepAliveInterval)
this._keepAliveInterval = setInterval(async () => {
if (Date.now() - this._lastUpdateTime > 900_000) {
this._keepAliveAction()
this._lastUpdateTime = Date.now()
}
}, 60_000)
})
this.primaryConnection.on('update', (update) => {
this._lastUpdateTime = Date.now()
this._handleUpdate(update)
})
this.primaryConnection.on('wait', () =>
this._cleanupPrimaryConnection(),
)
this.primaryConnection.on('key-change', async (key) => {
this.storage.setAuthKeyFor(this._primaryDc.id, key)
await this._saveStorage()
})
this.primaryConnection.on('error', (err) =>
this._emitError(err, this.primaryConnection),
)
}
/** /**
* Initialize the connection to the primary DC. * Initialize the connection to the primary DC.
* *
@ -475,7 +383,7 @@ export class BaseTelegramClient extends EventEmitter {
* Wait until this client is usable (i.e. connection is fully ready) * Wait until this client is usable (i.e. connection is fully ready)
*/ */
async waitUntilUsable(): Promise<void> { async waitUntilUsable(): Promise<void> {
return new Promise((resolve) => { return new Promise((_resolve) => {
// todo // todo
// this.primaryConnection.once('usable', resolve) // this.primaryConnection.once('usable', resolve)
}) })
@ -572,6 +480,8 @@ export class BaseTelegramClient extends EventEmitter {
for (let i = 0; i < this._rpcRetryCount; i++) { for (let i = 0; i < this._rpcRetryCount; i++) {
try { try {
// fixme temporary hack
// eslint-disable-next-line dot-notation
const res = await this.network['_primaryDc']!.mainConnection.sendRpc( const res = await this.network['_primaryDc']!.mainConnection.sendRpc(
message, message,
stack, stack,
@ -915,7 +825,7 @@ export class BaseTelegramClient extends EventEmitter {
self: await this.storage.getSelf(), self: await this.storage.getSelf(),
testMode: this._testMode, testMode: this._testMode,
primaryDc: this._defaultDc, primaryDc: this._defaultDc,
authKey: Buffer.from([]) //this.primaryConnection.getAuthKey()!, authKey: Buffer.from([]), //this.primaryConnection.getAuthKey()!,
}) })
} }

View file

@ -1,8 +1,10 @@
import { TlBinaryReader, TlReaderMap } from '@mtcute/tl-runtime'
import { buffersEqual, ICryptoProvider, Logger, randomBytes } from '../utils'
import Long from 'long' import Long from 'long'
import { createAesIgeForMessage } from '../utils/crypto/mtproto'
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
import { TlBinaryReader, TlReaderMap } from '@mtcute/tl-runtime'
import { buffersEqual, ICryptoProvider, Logger, randomBytes } from '../utils'
import { createAesIgeForMessage } from '../utils/crypto/mtproto'
export class AuthKey { export class AuthKey {
ready = false ready = false
@ -15,7 +17,7 @@ export class AuthKey {
constructor( constructor(
readonly _crypto: ICryptoProvider, readonly _crypto: ICryptoProvider,
readonly log: Logger, readonly log: Logger,
readonly _readerMap: TlReaderMap readonly _readerMap: TlReaderMap,
) {} ) {}
match(keyId: Buffer): boolean { match(keyId: Buffer): boolean {
@ -37,7 +39,7 @@ export class AuthKey {
async encryptMessage( async encryptMessage(
message: Buffer, message: Buffer,
serverSalt: Long, serverSalt: Long,
sessionId: Long sessionId: Long,
): Promise<Buffer> { ): Promise<Buffer> {
if (!this.ready) throw new Error('Keys are not set up!') if (!this.ready) throw new Error('Keys are not set up!')
@ -61,7 +63,7 @@ export class AuthKey {
this._crypto, this._crypto,
this.key, this.key,
messageKey, messageKey,
true true,
) )
const encryptedData = await ige.encrypt(buf) const encryptedData = await ige.encrypt(buf)
@ -71,7 +73,7 @@ export class AuthKey {
async decryptMessage( async decryptMessage(
data: Buffer, data: Buffer,
sessionId: Long, sessionId: Long,
callback: (msgId: tl.Long, seqNo: number, data: TlBinaryReader) => void callback: (msgId: tl.Long, seqNo: number, data: TlBinaryReader) => void,
): Promise<void> { ): Promise<void> {
const messageKey = data.slice(8, 24) const messageKey = data.slice(8, 24)
const encryptedData = data.slice(24) const encryptedData = data.slice(24)
@ -80,13 +82,13 @@ export class AuthKey {
this._crypto, this._crypto,
this.key, this.key,
messageKey, messageKey,
false false,
) )
const innerData = await ige.decrypt(encryptedData) const innerData = await ige.decrypt(encryptedData)
const expectedMessageKey = ( const expectedMessageKey = (
await this._crypto.sha256( await this._crypto.sha256(
Buffer.concat([this.serverSalt, innerData]) Buffer.concat([this.serverSalt, innerData]),
) )
).slice(8, 24) ).slice(8, 24)
@ -94,8 +96,9 @@ export class AuthKey {
this.log.warn( this.log.warn(
'[%h] received message with invalid messageKey = %h (expected %h)', '[%h] received message with invalid messageKey = %h (expected %h)',
messageKey, messageKey,
expectedMessageKey expectedMessageKey,
) )
return return
} }
@ -107,8 +110,9 @@ export class AuthKey {
if (sessionId_.neq(sessionId)) { if (sessionId_.neq(sessionId)) {
this.log.warn( this.log.warn(
'ignoring message with invalid sessionId = %h', 'ignoring message with invalid sessionId = %h',
sessionId_ sessionId_,
) )
return return
} }
@ -119,16 +123,18 @@ export class AuthKey {
this.log.warn( this.log.warn(
'ignoring message with invalid length: %d > %d', 'ignoring message with invalid length: %d > %d',
length, length,
innerData.length - 32 innerData.length - 32,
) )
return return
} }
if (length % 4 !== 0) { if (length % 4 !== 0) {
this.log.warn( this.log.warn(
'ignoring message with invalid length: %d is not a multiple of 4', 'ignoring message with invalid length: %d is not a multiple of 4',
length length,
) )
return return
} }
@ -137,8 +143,9 @@ export class AuthKey {
if (paddingSize < 12 || paddingSize > 1024) { if (paddingSize < 12 || paddingSize > 1024) {
this.log.warn( this.log.warn(
'ignoring message with invalid padding size: %d', 'ignoring message with invalid padding size: %d',
paddingSize paddingSize,
) )
return return
} }

View file

@ -102,7 +102,7 @@ async function rsaEncrypt(
export async function doAuthorization( export async function doAuthorization(
connection: SessionConnection, connection: SessionConnection,
crypto: ICryptoProvider, crypto: ICryptoProvider,
expiresIn?: number expiresIn?: number,
): Promise<[Buffer, Long, number]> { ): Promise<[Buffer, Long, number]> {
// eslint-disable-next-line dot-notation // eslint-disable-next-line dot-notation
const session = connection['_session'] const session = connection['_session']
@ -130,7 +130,7 @@ export async function doAuthorization(
return TlBinaryReader.deserializeObject( return TlBinaryReader.deserializeObject(
readerMap, readerMap,
await connection.waitForUnencryptedMessage(), await connection.waitForUnencryptedMessage(),
20 // skip mtproto header 20, // skip mtproto header
) )
} }
@ -186,7 +186,7 @@ export async function doAuthorization(
newNonce, newNonce,
serverNonce: resPq.serverNonce, serverNonce: resPq.serverNonce,
dc: dcId, dc: dcId,
expiresIn: expiresIn! // whatever expiresIn: expiresIn!, // whatever
} }
const pqInnerData = TlBinaryWriter.serializeObject(writerMap, _pqInnerData) const pqInnerData = TlBinaryWriter.serializeObject(writerMap, _pqInnerData)

View file

@ -28,7 +28,7 @@ export class ConfigManager {
if (this._updateTimeout) clearTimeout(this._updateTimeout) if (this._updateTimeout) clearTimeout(this._updateTimeout)
this._updateTimeout = setTimeout( this._updateTimeout = setTimeout(
() => this.update(), () => this.update(),
(config.expires - Date.now() / 1000) * 1000 (config.expires - Date.now() / 1000) * 1000,
) )
for (const cb of this._listeners) cb(config) for (const cb of this._listeners) cb(config)
@ -50,6 +50,7 @@ export class ConfigManager {
async get(): Promise<tl.RawConfig> { async get(): Promise<tl.RawConfig> {
if (this.isStale) await this.update() if (this.isStale) await this.update()
return this._config! return this._config!
} }

View file

@ -8,21 +8,17 @@ import {
TlWriterMap, TlWriterMap,
} from '@mtcute/tl-runtime' } from '@mtcute/tl-runtime'
import { getRandomInt, ICryptoProvider, Logger, randomLong } from '../utils' import { ControllablePromise,
import { buffersEqual, randomBytes } from '../utils/buffer-utils' Deque,
import { getRandomInt,
ICryptoProvider, ICryptoProvider,
Logger, Logger,
getRandomInt,
randomLong,
ControllablePromise,
LruSet,
Deque,
SortedArray,
LongMap, LongMap,
LruSet,
randomLong,
SortedArray,
} from '../utils' } from '../utils'
import { AuthKey } from './auth-key' import { AuthKey } from './auth-key'
import { createAesIgeForMessage } from '../utils/crypto/mtproto'
export interface PendingRpc { export interface PendingRpc {
method: string method: string
@ -39,7 +35,7 @@ export interface PendingRpc {
initConn?: boolean initConn?: boolean
getState?: number getState?: number
cancelled?: boolean cancelled?: boolean
timeout?: number timeout?: NodeJS.Timeout
} }
export type PendingMessage = export type PendingMessage =
@ -115,7 +111,7 @@ export class MtprotoSession {
queuedCancelReq: Long[] = [] queuedCancelReq: Long[] = []
getStateSchedule = new SortedArray<PendingRpc>( getStateSchedule = new SortedArray<PendingRpc>(
[], [],
(a, b) => a.getState! - b.getState! (a, b) => a.getState! - b.getState!,
) )
// requests info // requests info
@ -200,9 +196,10 @@ export class MtprotoSession {
this.log.debug( this.log.debug(
'enqueued %s for sending (msg_id = %s)', 'enqueued %s for sending (msg_id = %s)',
rpc.method, rpc.method,
rpc.msgId || 'n/a' rpc.msgId || 'n/a',
) )
this.queuedRpc.pushBack(rpc) this.queuedRpc.pushBack(rpc)
return true return true
} }
@ -237,19 +234,21 @@ export class MtprotoSession {
/** Encrypt a single MTProto message using session's keys */ /** Encrypt a single MTProto message using session's keys */
async encryptMessage(message: Buffer): Promise<Buffer> { async encryptMessage(message: Buffer): Promise<Buffer> {
const key = this._authKeyTemp.ready ? this._authKeyTemp : this._authKey const key = this._authKeyTemp.ready ? this._authKeyTemp : this._authKey
return key.encryptMessage(message, this.serverSalt, this._sessionId) return key.encryptMessage(message, this.serverSalt, this._sessionId)
} }
/** Decrypt a single MTProto message using session's keys */ /** Decrypt a single MTProto message using session's keys */
async decryptMessage( async decryptMessage(
data: Buffer, data: Buffer,
callback: Parameters<AuthKey['decryptMessage']>[2] callback: Parameters<AuthKey['decryptMessage']>[2],
): Promise<void> { ): Promise<void> {
if (!this._authKey.ready) throw new Error('Keys are not set up!') if (!this._authKey.ready) throw new Error('Keys are not set up!')
const authKeyId = data.slice(0, 8) const authKeyId = data.slice(0, 8)
let key: AuthKey let key: AuthKey
if (this._authKey.match(authKeyId)) { if (this._authKey.match(authKeyId)) {
key = this._authKey key = this._authKey
} else if (this._authKeyTemp.match(authKeyId)) { } else if (this._authKeyTemp.match(authKeyId)) {
@ -264,6 +263,7 @@ export class MtprotoSession {
this._authKeyTemp.id, this._authKeyTemp.id,
this._authKeyTempSecondary.id, this._authKeyTempSecondary.id,
) )
return return
} }

View file

@ -1,12 +1,13 @@
import EventEmitter from 'events' import EventEmitter from 'events'
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
import { Logger } from '../utils' import { Logger } from '../utils'
import { MtprotoSession } from './mtproto-session'
import { import {
SessionConnection, SessionConnection,
SessionConnectionParams, SessionConnectionParams,
} from './session-connection' } from './session-connection'
import { MtprotoSession } from './mtproto-session'
export class MultiSessionConnection extends EventEmitter { export class MultiSessionConnection extends EventEmitter {
private _log: Logger private _log: Logger
@ -16,7 +17,7 @@ export class MultiSessionConnection extends EventEmitter {
constructor( constructor(
readonly params: SessionConnectionParams, readonly params: SessionConnectionParams,
private _count: number, private _count: number,
log: Logger log: Logger,
) { ) {
super() super()
this._log = log.create('multi') this._log = log.create('multi')
@ -38,7 +39,7 @@ export class MultiSessionConnection extends EventEmitter {
this._log.debug( this._log.debug(
'updating sessions count: %d -> %d', 'updating sessions count: %d -> %d',
this._sessions.length, this._sessions.length,
this._count this._count,
) )
// there are two cases // there are two cases
@ -52,8 +53,8 @@ export class MultiSessionConnection extends EventEmitter {
this.params.crypto, this.params.crypto,
this._log.create('session'), this._log.create('session'),
this.params.readerMap, this.params.readerMap,
this.params.writerMap this.params.writerMap,
) ),
) )
} }
@ -75,6 +76,7 @@ export class MultiSessionConnection extends EventEmitter {
} }
this._sessions.splice(this._count) this._sessions.splice(this._count)
return return
} }
@ -84,7 +86,7 @@ export class MultiSessionConnection extends EventEmitter {
this.params.crypto, this.params.crypto,
this._log.create('session'), this._log.create('session'),
this.params.readerMap, this.params.readerMap,
this.params.writerMap this.params.writerMap,
) )
// brvh // brvh
@ -101,16 +103,17 @@ export class MultiSessionConnection extends EventEmitter {
this._log.debug( this._log.debug(
'updating connections count: %d -> %d', 'updating connections count: %d -> %d',
this._connections.length, this._connections.length,
this._count this._count,
) )
const newEnforcePfs = this._count > 1 && this.params.isMainConnection const newEnforcePfs = this._count > 1 && this.params.isMainConnection
const enforcePfsChanged = newEnforcePfs !== this._enforcePfs const enforcePfsChanged = newEnforcePfs !== this._enforcePfs
if (enforcePfsChanged) { if (enforcePfsChanged) {
this._log.debug( this._log.debug(
'enforcePfs changed: %s -> %s', 'enforcePfs changed: %s -> %s',
this._enforcePfs, this._enforcePfs,
newEnforcePfs newEnforcePfs,
) )
this._enforcePfs = newEnforcePfs this._enforcePfs = newEnforcePfs
} }
@ -123,6 +126,7 @@ export class MultiSessionConnection extends EventEmitter {
} }
this._connections.splice(this._count) this._connections.splice(this._count)
return return
} }
@ -134,16 +138,16 @@ export class MultiSessionConnection extends EventEmitter {
// create new connections // create new connections
for (let i = this._connections.length; i < this._count; i++) { for (let i = this._connections.length; i < this._count; i++) {
const session = this.params.isMainConnection const session = this.params.isMainConnection ?
? this._sessions[i] this._sessions[i] :
: this._sessions[0] this._sessions[0]
const conn = new SessionConnection( const conn = new SessionConnection(
{ {
...this.params, ...this.params,
usePfs: this.params.usePfs || this._enforcePfs, usePfs: this.params.usePfs || this._enforcePfs,
isMainConnection: this.params.isMainConnection && i === 0, isMainConnection: this.params.isMainConnection && i === 0,
}, },
session session,
) )
conn.on('update', (update) => this.emit('update', update)) conn.on('update', (update) => this.emit('update', update))
@ -158,7 +162,7 @@ export class MultiSessionConnection extends EventEmitter {
} }
}) })
conn.on('tmp-key-change', (key, expires) => conn.on('tmp-key-change', (key, expires) =>
this.emit('tmp-key-change', i, key, expires) this.emit('tmp-key-change', i, key, expires),
) )
conn.on('auth-begin', () => { conn.on('auth-begin', () => {
this._log.debug('received auth-begin from connection %d', i) this._log.debug('received auth-begin from connection %d', i)
@ -189,12 +193,13 @@ export class MultiSessionConnection extends EventEmitter {
sendRpc<T extends tl.RpcMethod>( sendRpc<T extends tl.RpcMethod>(
request: T, request: T,
stack?: string, stack?: string,
timeout?: number timeout?: number,
): Promise<tl.RpcCallReturn[T['_']]> { ): Promise<tl.RpcCallReturn[T['_']]> {
if (this.params.isMainConnection) { if (this.params.isMainConnection) {
// find the least loaded connection // find the least loaded connection
let min = Infinity let min = Infinity
let minIdx = 0 let minIdx = 0
for (let i = 0; i < this._connections.length; i++) { for (let i = 0; i < this._connections.length; i++) {
const conn = this._connections[i] const conn = this._connections[i]
const total = const total =
@ -219,7 +224,7 @@ export class MultiSessionConnection extends EventEmitter {
async changeDc(dc: tl.RawDcOption, authKey?: Buffer | null): Promise<void> { async changeDc(dc: tl.RawDcOption, authKey?: Buffer | null): Promise<void> {
await Promise.all( await Promise.all(
this._connections.map((conn) => conn.changeDc(dc, authKey)) this._connections.map((conn) => conn.changeDc(dc, authKey)),
) )
} }
@ -232,7 +237,7 @@ export class MultiSessionConnection extends EventEmitter {
async setAuthKey( async setAuthKey(
authKey: Buffer | null, authKey: Buffer | null,
temp = false, temp = false,
idx = 0 idx = 0,
): Promise<void> { ): Promise<void> {
const session = this._sessions[idx] const session = this._sessions[idx]
const key = temp ? session._authKeyTemp : session._authKey const key = temp ? session._authKeyTemp : session._authKey

View file

@ -1,20 +1,20 @@
import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
import { ITelegramStorage } from '../storage'
import { ICryptoProvider, Logger } from '../utils' import { ICryptoProvider, Logger } from '../utils'
import { defaultTransportFactory, TransportFactory } from './transports' import { ConfigManager } from './config-manager'
import { MultiSessionConnection } from './multi-session-connection'
import { PersistentConnectionParams } from './persistent-connection'
import { import {
defaultReconnectionStrategy, defaultReconnectionStrategy,
ReconnectionStrategy, ReconnectionStrategy,
} from './reconnection' } from './reconnection'
import { PersistentConnectionParams } from './persistent-connection'
import { ConfigManager } from './config-manager'
import { MultiSessionConnection } from './multi-session-connection'
import { import {
SessionConnection, SessionConnection,
SessionConnectionParams, SessionConnectionParams,
} from './session-connection' } from './session-connection'
import { ITelegramStorage } from '../storage' import { defaultTransportFactory, TransportFactory } from './transports'
export type ConnectionKind = 'main' | 'upload' | 'download' | 'download-small' export type ConnectionKind = 'main' | 'upload' | 'download' | 'download-small'
@ -81,35 +81,36 @@ export class DcConnectionManager {
isMainConnection: true, isMainConnection: true,
}, },
this.manager.params.connectionCount?.main ?? 1, this.manager.params.connectionCount?.main ?? 1,
this.manager._log this.manager._log,
) )
constructor( constructor(
readonly manager: NetworkManager, readonly manager: NetworkManager,
readonly dcId: number, readonly dcId: number,
private _dc: tl.RawDcOption private _dc: tl.RawDcOption,
) { ) {
this._setupMulti(this.mainConnection, 'main') this._setupMulti(this.mainConnection, 'main')
} }
private _setupMulti( private _setupMulti(
connection: MultiSessionConnection, connection: MultiSessionConnection,
kind: ConnectionKind kind: ConnectionKind,
): void { ): void {
connection.on('key-change', (idx, key) => { connection.on('key-change', (idx, key) => {
if (kind !== 'main') { if (kind !== 'main') {
// main connection is responsible for authorization, // main connection is responsible for authorization,
// and keys are then sent to other connections // and keys are then sent to other connections
this.manager._log.warn( this.manager._log.warn(
'got key-change from non-main connection' 'got key-change from non-main connection',
) )
return return
} }
this.manager._log.debug( this.manager._log.debug(
'key change for dc %d from connection %d', 'key change for dc %d from connection %d',
this.dcId, this.dcId,
idx idx,
) )
this.manager._storage.setAuthKeyFor(this.dcId, key) this.manager._storage.setAuthKeyFor(this.dcId, key)
@ -119,21 +120,22 @@ export class DcConnectionManager {
connection.on('tmp-key-change', (idx, key, expires) => { connection.on('tmp-key-change', (idx, key, expires) => {
if (kind !== 'main') { if (kind !== 'main') {
this.manager._log.warn( this.manager._log.warn(
'got tmp-key-change from non-main connection' 'got tmp-key-change from non-main connection',
) )
return return
} }
this.manager._log.debug( this.manager._log.debug(
'temp key change for dc %d from connection %d', 'temp key change for dc %d from connection %d',
this.dcId, this.dcId,
idx idx,
) )
this.manager._storage.setTempAuthKeyFor( this.manager._storage.setTempAuthKeyFor(
this.dcId, this.dcId,
idx, idx,
key, key,
expires * 1000 expires * 1000,
) )
}) })
@ -142,8 +144,9 @@ export class DcConnectionManager {
// to avoid them sending requests before auth is complete // to avoid them sending requests before auth is complete
if (kind !== 'main') { if (kind !== 'main') {
this.manager._log.warn( this.manager._log.warn(
'got auth-begin from non-main connection' 'got auth-begin from non-main connection',
) )
return return
} }
@ -166,7 +169,7 @@ export class DcConnectionManager {
for (let i = 0; i < this.mainConnection._sessions.length; i++) { for (let i = 0; i < this.mainConnection._sessions.length; i++) {
const temp = await this.manager._storage.getAuthKeyFor( const temp = await this.manager._storage.getAuthKeyFor(
this.dcId, this.dcId,
i i,
) )
await this.mainConnection.setAuthKey(temp, true, i) await this.mainConnection.setAuthKey(temp, true, i)
} }
@ -208,19 +211,23 @@ export class NetworkManager {
constructor( constructor(
readonly params: NetworkManagerParams & NetworkManagerExtraParams, readonly params: NetworkManagerParams & NetworkManagerExtraParams,
readonly config: ConfigManager readonly config: ConfigManager,
) { ) {
let deviceModel = 'mtcute on ' let deviceModel = 'mtcute on '
let appVersion = 'unknown' let appVersion = 'unknown'
if (typeof process !== 'undefined' && typeof require !== 'undefined') { if (typeof process !== 'undefined' && typeof require !== 'undefined') {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const os = require('os') const os = require('os')
deviceModel += `${os.type()} ${os.arch()} ${os.release()}` deviceModel += `${os.type()} ${os.arch()} ${os.release()}`
try { try {
// for production builds // for production builds
// eslint-disable-next-line @typescript-eslint/no-var-requires
appVersion = require('../package.json').version appVersion = require('../package.json').version
} catch (e) { } catch (e) {
try { try {
// for development builds (additional /src/ in path) // for development builds (additional /src/ in path)
// eslint-disable-next-line @typescript-eslint/no-var-requires
appVersion = require('../../package.json').version appVersion = require('../../package.json').version
} catch (e) {} } catch (e) {}
} }
@ -238,6 +245,7 @@ export class NetworkManager {
langCode: 'en', langCode: 'en',
...(params.initConnectionOptions ?? {}), ...(params.initConnectionOptions ?? {}),
apiId: params.apiId, apiId: params.apiId,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
query: null as any, query: null as any,
} }
@ -276,7 +284,7 @@ export class NetworkManager {
// this._cleanupPrimaryConnection() // this._cleanupPrimaryConnection()
// ) // )
dc.mainConnection.on('error', (err, conn) => dc.mainConnection.on('error', (err, conn) =>
this.params._emitError(err, conn) this.params._emitError(err, conn),
) )
dc.loadKeys() dc.loadKeys()
.catch((e) => this.params._emitError(e)) .catch((e) => this.params._emitError(e))
@ -297,7 +305,7 @@ export class NetworkManager {
this._dcConnections[defaultDc.id] = new DcConnectionManager( this._dcConnections[defaultDc.id] = new DcConnectionManager(
this, this,
defaultDc.id, defaultDc.id,
defaultDc defaultDc,
) )
this._switchPrimaryDc(this._dcConnections[defaultDc.id]) this._switchPrimaryDc(this._dcConnections[defaultDc.id])
} }

View file

@ -2,10 +2,7 @@ import EventEmitter from 'events'
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
import { ICryptoProvider, Logger } from '../utils'
import { import {
ControllablePromise,
createControllablePromise,
ICryptoProvider, ICryptoProvider,
Logger, Logger,
} from '../utils' } from '../utils'

View file

@ -1,5 +1,4 @@
/* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/no-unused-vars */
// will be reworked in MTQ-32 // will be reworked in MTQ-32
import Long from 'long' import Long from 'long'
@ -16,30 +15,21 @@ import { gzipDeflate, gzipInflate } from '@mtcute/tl-runtime/src/platform/gzip'
import { import {
ControllablePromise, ControllablePromise,
createCancellablePromise, createCancellablePromise,
Deque, createControllablePromise,
EarlyTimer, EarlyTimer,
Logger, longFromBuffer,
LongMap, randomBytes,
LruSet,
randomLong, randomLong,
removeFromLongArray, removeFromLongArray,
SortedArray,
EarlyTimer,
ControllablePromise,
createCancellablePromise,
randomBytes,
longFromBuffer,
createControllablePromise,
} from '../utils' } from '../utils'
import { MtprotoSession, PendingMessage, PendingRpc } from './mtproto-session' import { createAesIgeForMessageOld } from '../utils/crypto/mtproto'
import { doAuthorization } from './authorization' import { doAuthorization } from './authorization'
import { MtprotoSession } from './mtproto-session' import { MtprotoSession, PendingMessage, PendingRpc } from './mtproto-session'
import { import {
PersistentConnection, PersistentConnection,
PersistentConnectionParams, PersistentConnectionParams,
} from './persistent-connection' } from './persistent-connection'
import { TransportError } from './transports' import { TransportError } from './transports'
import { createAesIgeForMessageOld } from '../utils/crypto/mtproto'
const TEMP_AUTH_KEY_EXPIRY = 86400 const TEMP_AUTH_KEY_EXPIRY = 86400
@ -102,7 +92,7 @@ export class SessionConnection extends PersistentConnection {
constructor( constructor(
params: SessionConnectionParams, params: SessionConnectionParams,
readonly _session: MtprotoSession readonly _session: MtprotoSession,
) { ) {
super(params, _session.log.create('conn')) super(params, _session.log.create('conn'))
this._flushTimer.onTimeout(this._flush.bind(this)) this._flushTimer.onTimeout(this._flush.bind(this))
@ -122,6 +112,7 @@ export class SessionConnection extends PersistentConnection {
const key = temp ? this._session._authKeyTemp : this._session._authKey const key = temp ? this._session._authKeyTemp : this._session._authKey
if (!key.ready) return null if (!key.ready) return null
return key.key return key.key
} }
@ -130,6 +121,7 @@ export class SessionConnection extends PersistentConnection {
this.log.debug('use pfs changed to %s', usePfs) this.log.debug('use pfs changed to %s', usePfs)
this._usePfs = usePfs this._usePfs = usePfs
if (!usePfs) { if (!usePfs) {
this._isPfsBindingPending = false this._isPfsBindingPending = false
this._isPfsBindingPendingInBackground = false this._isPfsBindingPendingInBackground = false
@ -174,12 +166,14 @@ export class SessionConnection extends PersistentConnection {
if (!this._session._authKey.ready) { if (!this._session._authKey.ready) {
if (!this.params.isMainConnection) { if (!this.params.isMainConnection) {
this.log.info('no auth key, waiting for main connection') this.log.info('no auth key, waiting for main connection')
// once it is done, we will be notified // once it is done, we will be notified
return return
} }
this.log.info('no perm auth key, authorizing...') this.log.info('no perm auth key, authorizing...')
this._authorize() this._authorize()
// todo: if we use pfs, we can also start temp key exchange here // todo: if we use pfs, we can also start temp key exchange here
return return
} }
@ -187,6 +181,7 @@ export class SessionConnection extends PersistentConnection {
if (this._usePfs && !this._session._authKeyTemp.ready) { if (this._usePfs && !this._session._authKeyTemp.ready) {
this.log.info('no temp auth key but using pfs, authorizing') this.log.info('no temp auth key but using pfs, authorizing')
this._authorizePfs() this._authorizePfs()
return return
} }
@ -219,11 +214,13 @@ export class SessionConnection extends PersistentConnection {
this._session._authKeyTemp.reset() this._session._authKeyTemp.reset()
this._authorizePfs() this._authorizePfs()
this._onAllFailed('temp key expired, binding started') this._onAllFailed('temp key expired, binding started')
return return
} else if (this._isPfsBindingPending) { } else if (this._isPfsBindingPending) {
this.log.info('transport error 404, pfs binding in progress') this.log.info('transport error 404, pfs binding in progress')
this._onAllFailed('temp key expired, binding pending') this._onAllFailed('temp key expired, binding pending')
return return
} }
@ -276,6 +273,7 @@ export class SessionConnection extends PersistentConnection {
_authorize(): void { _authorize(): void {
if (this._session.authorizationPending) { if (this._session.authorizationPending) {
this.log.info('_authorize(): authorization already in progress') this.log.info('_authorize(): authorization already in progress')
return return
} }
@ -283,6 +281,7 @@ export class SessionConnection extends PersistentConnection {
// we don't authorize on non-main connections // we don't authorize on non-main connections
this.log.debug('_authorize(): non-main connection, requesting...') this.log.debug('_authorize(): non-main connection, requesting...')
this.emit('request-auth') this.emit('request-auth')
return return
} }
@ -301,9 +300,8 @@ export class SessionConnection extends PersistentConnection {
if (this._usePfs) { if (this._usePfs) {
return this._authorizePfs() return this._authorizePfs()
} else {
this.onConnectionUsable()
} }
this.onConnectionUsable()
}) })
.catch((err) => { .catch((err) => {
this._session.authorizationPending = false this._session.authorizationPending = false
@ -315,6 +313,7 @@ export class SessionConnection extends PersistentConnection {
private _authorizePfs(background = false): void { private _authorizePfs(background = false): void {
if (this._isPfsBindingPending) return if (this._isPfsBindingPending) return
if (this._pfsUpdateTimeout) { if (this._pfsUpdateTimeout) {
clearTimeout(this._pfsUpdateTimeout) clearTimeout(this._pfsUpdateTimeout)
this._pfsUpdateTimeout = undefined this._pfsUpdateTimeout = undefined
@ -326,6 +325,7 @@ export class SessionConnection extends PersistentConnection {
// current operation to complete // current operation to complete
this._isPfsBindingPendingInBackground = false this._isPfsBindingPendingInBackground = false
this._isPfsBindingPending = true this._isPfsBindingPending = true
return return
} }
@ -339,6 +339,7 @@ export class SessionConnection extends PersistentConnection {
.then(async ([tempAuthKey, tempServerSalt]) => { .then(async ([tempAuthKey, tempServerSalt]) => {
if (!this._usePfs) { if (!this._usePfs) {
this.log.info('pfs has been disabled while generating temp key') this.log.info('pfs has been disabled while generating temp key')
return return
} }
@ -351,7 +352,7 @@ export class SessionConnection extends PersistentConnection {
'binding temp_auth_key (%h) to perm_auth_key (%h), msg_id = %l...', 'binding temp_auth_key (%h) to perm_auth_key (%h), msg_id = %l...',
tempKey.id, tempKey.id,
this._session._authKey.id, this._session._authKey.id,
msgId msgId,
) )
// we now need to bind the key // we now need to bind the key
@ -387,7 +388,7 @@ export class SessionConnection extends PersistentConnection {
this.params.crypto, this.params.crypto,
this._session._authKey.key, this._session._authKey.key,
msgKey, msgKey,
true true,
) )
const encryptedData = await ige.encrypt(msgWithPadding) const encryptedData = await ige.encrypt(msgWithPadding)
const encryptedMessage = Buffer.concat([ const encryptedMessage = Buffer.concat([
@ -396,7 +397,7 @@ export class SessionConnection extends PersistentConnection {
encryptedData, encryptedData,
]) ])
const promise = createControllablePromise() const promise = createControllablePromise<mtp.RawMt_rpc_error | boolean>()
// encrypt the message using temp key and same msg id // encrypt the message using temp key and same msg id
// this is a bit of a hack, but it works // this is a bit of a hack, but it works
@ -424,11 +425,11 @@ export class SessionConnection extends PersistentConnection {
} }
const reqSize = TlSerializationCounter.countNeededBytes( const reqSize = TlSerializationCounter.countNeededBytes(
this._writerMap, this._writerMap,
request request,
) )
const reqWriter = TlBinaryWriter.alloc( const reqWriter = TlBinaryWriter.alloc(
this._writerMap, this._writerMap,
reqSize + 16 reqSize + 16,
) )
reqWriter.long(this._registerOutgoingMsgId(msgId)) reqWriter.long(this._registerOutgoingMsgId(msgId))
reqWriter.uint(this._session.getSeqNo()) reqWriter.uint(this._session.getSeqNo())
@ -439,16 +440,17 @@ export class SessionConnection extends PersistentConnection {
const requestEncrypted = await tempKey.encryptMessage( const requestEncrypted = await tempKey.encryptMessage(
reqWriter.result(), reqWriter.result(),
tempServerSalt, tempServerSalt,
this._session._sessionId this._session._sessionId,
) )
await this.send(requestEncrypted) await this.send(requestEncrypted)
const res: mtp.RawMt_rpc_error | boolean = await promise const res = await promise
this._session.pendingMessages.delete(msgId) this._session.pendingMessages.delete(msgId)
if (!this._usePfs) { if (!this._usePfs) {
this.log.info('pfs has been disabled while binding temp key') this.log.info('pfs has been disabled while binding temp key')
return return
} }
@ -456,7 +458,7 @@ export class SessionConnection extends PersistentConnection {
this.log.error( this.log.error(
'failed to bind temp key: %s:%s', 'failed to bind temp key: %s:%s',
res.errorCode, res.errorCode,
res.errorMessage res.errorMessage,
) )
throw new Error('Failed to bind temporary key') throw new Error('Failed to bind temporary key')
} }
@ -470,7 +472,7 @@ export class SessionConnection extends PersistentConnection {
this.log.debug( this.log.debug(
'temp key has been bound, exp = %d', 'temp key has been bound, exp = %d',
inner.expiresAt inner.expiresAt,
) )
this._isPfsBindingPending = false this._isPfsBindingPending = false
@ -494,6 +496,7 @@ export class SessionConnection extends PersistentConnection {
if (this._isPfsBindingPendingInBackground) { if (this._isPfsBindingPendingInBackground) {
this._isPfsBindingPendingInBackground = false this._isPfsBindingPendingInBackground = false
// if we are in background, we can just retry // if we are in background, we can just retry
return this._authorizePfs(true) return this._authorizePfs(true)
} }
@ -510,7 +513,7 @@ export class SessionConnection extends PersistentConnection {
promise.reject(new Error('Timeout')) promise.reject(new Error('Timeout'))
this._pendingWaitForUnencrypted = this._pendingWaitForUnencrypted =
this._pendingWaitForUnencrypted.filter( this._pendingWaitForUnencrypted.filter(
(it) => it[0] !== promise (it) => it[0] !== promise,
) )
}, timeout) }, timeout)
this._pendingWaitForUnencrypted.push([promise, timeoutId]) this._pendingWaitForUnencrypted.push([promise, timeoutId])
@ -528,7 +531,7 @@ export class SessionConnection extends PersistentConnection {
promise.resolve(data) promise.resolve(data)
} else { } else {
this.log.debug( this.log.debug(
'unencrypted message received, but no one is waiting for it' 'unencrypted message received, but no one is waiting for it',
) )
} }
@ -704,7 +707,7 @@ export class SessionConnection extends PersistentConnection {
if (this.params.disableUpdates) { if (this.params.disableUpdates) {
this.log.warn( this.log.warn(
'received updates, but updates are disabled' 'received updates, but updates are disabled',
) )
// likely due to some request in the session missing invokeWithoutUpdates // likely due to some request in the session missing invokeWithoutUpdates
// todo: reset session // todo: reset session
@ -748,6 +751,7 @@ export class SessionConnection extends PersistentConnection {
} }
const msg = this._session.pendingMessages.get(reqMsgId) const msg = this._session.pendingMessages.get(reqMsgId)
if (!msg) { if (!msg) {
let result let result
@ -783,6 +787,7 @@ export class SessionConnection extends PersistentConnection {
if (msg._ !== 'rpc') { if (msg._ !== 'rpc') {
if (msg._ === 'bind') { if (msg._ === 'bind') {
msg.promise.resolve(result) msg.promise.resolve(result)
return return
} }
@ -831,6 +836,7 @@ export class SessionConnection extends PersistentConnection {
// (for god's sake why is this not in mtproto and instead hacked into the app layer) // (for god's sake why is this not in mtproto and instead hacked into the app layer)
this._authorizePfs() this._authorizePfs()
this._onMessageFailed(reqMsgId, 'AUTH_KEY_PERM_EMPTY', true) this._onMessageFailed(reqMsgId, 'AUTH_KEY_PERM_EMPTY', true)
return return
} }
@ -846,15 +852,16 @@ export class SessionConnection extends PersistentConnection {
this.sendRpc({ _: 'help.getNearestDc' }) this.sendRpc({ _: 'help.getNearestDc' })
.then(() => { .then(() => {
this.log.debug( this.log.debug(
'additional help.getNearestDc for initConnection ok' 'additional help.getNearestDc for initConnection ok',
) )
}) })
.catch((err) => { .catch((err) => {
this.log.debug( this.log.debug(
'additional help.getNearestDc for initConnection error: %s', 'additional help.getNearestDc for initConnection error: %s',
err err,
) )
}) })
return return
} }
@ -938,6 +945,7 @@ export class SessionConnection extends PersistentConnection {
} }
case 'bind': case 'bind':
break // do nothing, wait for the result break // do nothing, wait for the result
default: default:
if (!inContainer) { if (!inContainer) {
this.log.warn( this.log.warn(
@ -980,6 +988,7 @@ export class SessionConnection extends PersistentConnection {
inContainer = false, inContainer = false,
): void { ): void {
const msgInfo = this._session.pendingMessages.get(msgId) const msgInfo = this._session.pendingMessages.get(msgId)
if (!msgInfo) { if (!msgInfo) {
this.log.debug( this.log.debug(
'unknown message %l failed because of %s', 'unknown message %l failed because of %s',
@ -1069,7 +1078,7 @@ export class SessionConnection extends PersistentConnection {
this.log.debug( this.log.debug(
'temp key binding request %l failed because of %s, retrying', 'temp key binding request %l failed because of %s, retrying',
msgId, msgId,
reason reason,
) )
msgInfo.promise.reject(Error(reason)) msgInfo.promise.reject(Error(reason))
} }
@ -1089,6 +1098,7 @@ export class SessionConnection extends PersistentConnection {
private _registerOutgoingMsgId(msgId: Long): Long { private _registerOutgoingMsgId(msgId: Long): Long {
this._session.recentOutgoingMsgIds.add(msgId) this._session.recentOutgoingMsgIds.add(msgId)
return msgId return msgId
} }
@ -1182,7 +1192,7 @@ export class SessionConnection extends PersistentConnection {
this.log.error( this.log.error(
'received bad_msg_notification for msg_id = %l, code = %d. session will be reset', 'received bad_msg_notification for msg_id = %l, code = %d. session will be reset',
msg.badMsgId, msg.badMsgId,
msg.errorCode msg.errorCode,
) )
this._resetSession() this._resetSession()
break break
@ -1258,6 +1268,7 @@ export class SessionConnection extends PersistentConnection {
): void { ): void {
if (!msgId.isZero()) { if (!msgId.isZero()) {
const info = this._session.pendingMessages.get(msgId) const info = this._session.pendingMessages.get(msgId)
if (!info) { if (!info) {
this.log.info( this.log.info(
'received message info about unknown message %l', 'received message info about unknown message %l',
@ -1357,14 +1368,16 @@ export class SessionConnection extends PersistentConnection {
private _onDestroySessionResult(msg: mtp.TypeDestroySessionRes): void { private _onDestroySessionResult(msg: mtp.TypeDestroySessionRes): void {
const reqMsgId = this._session.destroySessionIdToMsgId.get( const reqMsgId = this._session.destroySessionIdToMsgId.get(
msg.sessionId msg.sessionId,
) )
if (!reqMsgId) { if (!reqMsgId) {
this.log.warn( this.log.warn(
'received %s for unknown session %h', 'received %s for unknown session %h',
msg._, msg._,
msg.sessionId msg.sessionId,
) )
return return
} }
@ -1374,8 +1387,7 @@ export class SessionConnection extends PersistentConnection {
} }
private _enqueueRpc(rpc: PendingRpc, force?: boolean) { private _enqueueRpc(rpc: PendingRpc, force?: boolean) {
if (this._session.enqueueRpc(rpc, force)) if (this._session.enqueueRpc(rpc, force)) { this._flushTimer.emitWhenIdle() }
this._flushTimer.emitWhenIdle()
} }
_resetSession(): void { _resetSession(): void {
@ -1598,7 +1610,7 @@ export class SessionConnection extends PersistentConnection {
private _doFlush(): void { private _doFlush(): void {
this.log.debug( this.log.debug(
'flushing send queue. queued rpc: %d', 'flushing send queue. queued rpc: %d',
this._session.queuedRpc.length this._session.queuedRpc.length,
) )
// oh bloody hell mate // oh bloody hell mate
@ -1633,6 +1645,7 @@ export class SessionConnection extends PersistentConnection {
if (this._session.queuedAcks.length) { if (this._session.queuedAcks.length) {
let acks = this._session.queuedAcks let acks = this._session.queuedAcks
if (acks.length > 8192) { if (acks.length > 8192) {
this._session.queuedAcks = acks.slice(8192) this._session.queuedAcks = acks.slice(8192)
acks = acks.slice(0, 8192) acks = acks.slice(0, 8192)
@ -1678,6 +1691,7 @@ export class SessionConnection extends PersistentConnection {
{ {
if (this._session.queuedStateReq.length) { if (this._session.queuedStateReq.length) {
let ids = this._session.queuedStateReq let ids = this._session.queuedStateReq
if (ids.length > 8192) { if (ids.length > 8192) {
this._session.queuedStateReq = ids.slice(8192) this._session.queuedStateReq = ids.slice(8192)
ids = ids.slice(0, 8192) ids = ids.slice(0, 8192)
@ -1695,7 +1709,7 @@ export class SessionConnection extends PersistentConnection {
if (idx > 0) { if (idx > 0) {
const toGetState = this._session.getStateSchedule.raw.splice( const toGetState = this._session.getStateSchedule.raw.splice(
0, 0,
idx idx,
) )
if (!getStateMsgIds) getStateMsgIds = [] if (!getStateMsgIds) getStateMsgIds = []
toGetState.forEach((it) => getStateMsgIds!.push(it.msgId!)) toGetState.forEach((it) => getStateMsgIds!.push(it.msgId!))
@ -1718,6 +1732,7 @@ export class SessionConnection extends PersistentConnection {
if (this._session.queuedResendReq.length) { if (this._session.queuedResendReq.length) {
resendMsgIds = this._session.queuedResendReq resendMsgIds = this._session.queuedResendReq
if (resendMsgIds.length > 8192) { if (resendMsgIds.length > 8192) {
this._session.queuedResendReq = resendMsgIds.slice(8192) this._session.queuedResendReq = resendMsgIds.slice(8192)
resendMsgIds = resendMsgIds.slice(0, 8192) resendMsgIds = resendMsgIds.slice(0, 8192)
@ -2008,8 +2023,7 @@ export class SessionConnection extends PersistentConnection {
) )
// put acks in the front so they are the first to be sent // put acks in the front so they are the first to be sent
if (ackMsgIds) if (ackMsgIds) { this._session.queuedAcks.splice(0, 0, ...ackMsgIds) }
this._session.queuedAcks.splice(0, 0, ...ackMsgIds)
this._onMessageFailed(rootMsgId, 'unknown error') this._onMessageFailed(rootMsgId, 'unknown error')
}) })
} }

View file

@ -195,7 +195,7 @@ export class MemoryStorage implements ITelegramStorage, IStateStorage {
dcId: number, dcId: number,
index: number, index: number,
key: Buffer | null, key: Buffer | null,
expiresAt: number expiresAt: number,
): void { ): void {
const k = `${dcId}:${index}` const k = `${dcId}:${index}`
this._state.authKeysTemp[k] = key this._state.authKeysTemp[k] = key
@ -210,8 +210,8 @@ export class MemoryStorage implements ITelegramStorage, IStateStorage {
if (tempIndex !== undefined) { if (tempIndex !== undefined) {
const k = `${dcId}:${tempIndex}` const k = `${dcId}:${tempIndex}`
if (Date.now() > (this._state.authKeysTempExpiry[k] ?? 0)) if (Date.now() > (this._state.authKeysTempExpiry[k] ?? 0)) { return null }
return null
return this._state.authKeysTemp[k] return this._state.authKeysTemp[k]
} }

View file

@ -73,7 +73,13 @@ export class Logger {
const val = args[idx] const val = args[idx]
args.splice(idx, 1) args.splice(idx, 1)
if (m === '%h') return Buffer.isBuffer(val) ? val.toString('hex') : val.toString(16)
if (m === '%h') {
if (Buffer.isBuffer(val)) return val.toString('hex')
if (typeof val === 'number') return val.toString(16)
return String(val)
}
if (m === '%b') return String(Boolean(val)) if (m === '%b') return String(Boolean(val))
if (m === '%j') { if (m === '%j') {

View file

@ -26,9 +26,9 @@ export function randomLong(unsigned = false): Long {
export function longFromBuffer(buf: Buffer, unsigned = false, le = true): Long { export function longFromBuffer(buf: Buffer, unsigned = false, le = true): Long {
if (le) { if (le) {
return new Long(buf.readInt32LE(0), buf.readInt32LE(4), unsigned) return new Long(buf.readInt32LE(0), buf.readInt32LE(4), unsigned)
} else {
return new Long(buf.readInt32BE(4), buf.readInt32BE(0), unsigned)
} }
return new Long(buf.readInt32BE(4), buf.readInt32BE(0), unsigned)
} }
/** /**

View file

@ -501,15 +501,16 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
return this._getFromKv('def_dc') return this._getFromKv('def_dc')
} }
getAuthKeyFor(dcId: number, tempIndex?: number): Promise<Buffer | null> { getAuthKeyFor(dcId: number, tempIndex?: number): Buffer | null {
let row let row
if (tempIndex !== undefined) { if (tempIndex !== undefined) {
row = this._statements.getAuthTemp.get(dcId, tempIndex, Date.now()) row = this._statements.getAuthTemp.get(dcId, tempIndex, Date.now())
} else { } else {
row = this._statements.getAuth.get(dcId) row = this._statements.getAuth.get(dcId)
} }
return row ? row.key : null return row ? (row as { key: Buffer }).key : null
} }
setAuthKeyFor(dcId: number, key: Buffer | null): void { setAuthKeyFor(dcId: number, key: Buffer | null): void {
@ -523,12 +524,12 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
dcId: number, dcId: number,
index: number, index: number,
key: Buffer | null, key: Buffer | null,
expires: number expires: number,
): void { ): void {
this._pending.push([ this._pending.push([
key === null key === null ?
? this._statements.delAuthTemp this._statements.delAuthTemp :
: this._statements.setAuthTemp, this._statements.setAuthTemp,
key === null ? [dcId, index] : [dcId, index, key, expires], key === null ? [dcId, index] : [dcId, index, key, expires],
]) ])
} }
@ -536,7 +537,7 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
dropAuthKeysFor(dcId: number): void { dropAuthKeysFor(dcId: number): void {
this._pending.push( this._pending.push(
[this._statements.delAuth, [dcId]], [this._statements.delAuth, [dcId]],
[this._statements.delAllAuthTemp, [dcId]] [this._statements.delAllAuthTemp, [dcId]],
) )
} }