diff --git a/packages/core/src/base-client.ts b/packages/core/src/base-client.ts index e62ecd18..3c47eb5b 100644 --- a/packages/core/src/base-client.ts +++ b/packages/core/src/base-client.ts @@ -57,7 +57,7 @@ import { PersistentConnectionParams } from './network/persistent-connection' import { ITelegramStorage, MemoryStorage } from './storage' import { ConfigManager } from './network/config-manager' -import { NetworkManager } from "./network/network-manager"; +import { NetworkManager, NetworkManagerExtraParams } from "./network/network-manager"; export interface BaseTelegramClientOptions { /** @@ -176,14 +176,19 @@ export interface BaseTelegramClientOptions { */ niceStacks?: boolean - /** - * **EXPERT USE ONLY!** - * - * Override TL layer used for the connection. - * - * **Does not** change the schema used. - */ - overrideLayer?: number + /** + * Extra parameters for {@link NetworkManager} + */ + network?: NetworkManagerExtraParams + + /** + * **EXPERT USE ONLY!** + * + * Override TL layer used for the connection. /;' + * + * **Does not** change the schema used. + */ + overrideLayer?: number /** * **EXPERT USE ONLY** @@ -329,6 +334,7 @@ export class BaseTelegramClient extends EventEmitter { storage: this.storage, testMode: this._testMode, transport: opts.transport, + ...(opts.network ?? {}), }, this._config) this.storage.setup?.(this.log, this._readerMap, this._writerMap) diff --git a/packages/core/src/network/auth-key.ts b/packages/core/src/network/auth-key.ts index 5537d40c..55da0f5d 100644 --- a/packages/core/src/network/auth-key.ts +++ b/packages/core/src/network/auth-key.ts @@ -30,6 +30,8 @@ export class AuthKey { this.clientSalt = authKey.slice(88, 120) this.serverSalt = authKey.slice(96, 128) this.id = (await this._crypto.sha1(authKey)).slice(-8) + + this.log.verbose('auth key set up, id = %h', this.id) } async encryptMessage( diff --git a/packages/core/src/network/authorization.ts b/packages/core/src/network/authorization.ts index 53d4f3af..c8e4f039 100644 --- a/packages/core/src/network/authorization.ts +++ b/packages/core/src/network/authorization.ts @@ -102,6 +102,7 @@ async function rsaEncrypt( export async function doAuthorization( connection: SessionConnection, crypto: ICryptoProvider, + expiresIn?: number ): Promise<[Buffer, Long, number]> { // eslint-disable-next-line dot-notation const session = connection['_session'] @@ -128,16 +129,17 @@ export async function doAuthorization( async function readNext(): Promise { return TlBinaryReader.deserializeObject( readerMap, - await connection.waitForNextMessage(), - 20, // skip mtproto header + await connection.waitForUnencryptedMessage(), + 20 // skip mtproto header ) } const log = connection.log.create('auth') + if (expiresIn) log.prefix = '[PFS] ' const nonce = randomBytes(16) // Step 1: PQ request - log.debug('starting PQ handshake, nonce = %h', nonce) + log.debug('starting PQ handshake (temp = %b), nonce = %h', expiresIn, nonce) await sendPlainMessage({ _: 'mt_req_pq_multi', nonce }) const resPq = await readNext() @@ -175,8 +177,8 @@ export async function doAuthorization( if (connection.params.testMode) dcId += 10000 if (connection.params.dc.mediaOnly) dcId = -dcId - const _pqInnerData: mtp.RawMt_p_q_inner_data_dc = { - _: 'mt_p_q_inner_data_dc', + const _pqInnerData: mtp.TypeP_Q_inner_data = { + _: expiresIn ? 'mt_p_q_inner_data_temp_dc' : 'mt_p_q_inner_data_dc', pq: resPq.pq, p, q, @@ -184,6 +186,7 @@ export async function doAuthorization( newNonce, serverNonce: resPq.serverNonce, dc: dcId, + expiresIn: expiresIn! // whatever } const pqInnerData = TlBinaryWriter.serializeObject(writerMap, _pqInnerData) diff --git a/packages/core/src/network/mtproto-session.ts b/packages/core/src/network/mtproto-session.ts index dd5756f7..62ad6cbf 100644 --- a/packages/core/src/network/mtproto-session.ts +++ b/packages/core/src/network/mtproto-session.ts @@ -80,6 +80,10 @@ export type PendingMessage = _: 'future_salts' containerId: Long } + | { + _: 'bind' + promise: ControllablePromise + } /** * Class encapsulating a single MTProto session and storing @@ -89,6 +93,8 @@ export class MtprotoSession { _sessionId = randomLong() _authKey = new AuthKey(this._crypto, this.log, this._readerMap) + _authKeyTemp = new AuthKey(this._crypto, this.log, this._readerMap) + _authKeyTempSecondary = new AuthKey(this._crypto, this.log, this._readerMap) _timeOffset = 0 _lastMessageId = Long.ZERO @@ -114,6 +120,7 @@ export class MtprotoSession { // requests info pendingMessages = new LongMap() + destroySessionIdToMsgId = new LongMap() initConnectionCalled = false @@ -131,6 +138,8 @@ export class MtprotoSession { */ reset(): void { this._authKey.reset() + this._authKeyTemp.reset() + this._authKeyTempSecondary.reset() this.resetState() } @@ -146,7 +155,7 @@ export class MtprotoSession { this._seqNo = 0 this._sessionId = randomLong() - this.log.debug('session reset, new sid = %l', this._sessionId) + this.log.debug('session reset, new sid = %h', this._sessionId) this.log.prefix = `[SESSION ${this._sessionId.toString(16)}] ` // reset session state @@ -222,6 +231,42 @@ export class MtprotoSession { return seqNo } + /** Encrypt a single MTProto message using session's keys */ + async encryptMessage(message: Buffer): Promise { + const key = this._authKeyTemp.ready ? this._authKeyTemp : this._authKey + return key.encryptMessage(message, this.serverSalt, this._sessionId) + } + + /** Decrypt a single MTProto message using session's keys */ + async decryptMessage( + data: Buffer, + callback: Parameters[2] + ): Promise { + if (!this._authKey.ready) throw new Error('Keys are not set up!') + + const authKeyId = data.slice(0, 8) + + let key: AuthKey + if (this._authKey.match(authKeyId)) { + key = this._authKey + } else if (this._authKeyTemp.match(authKeyId)) { + key = this._authKeyTemp + } else if (this._authKeyTempSecondary.match(authKeyId)) { + key = this._authKeyTempSecondary + } else { + this.log.warn( + 'received message with unknown authKey = %h (expected %h or %h or %h)', + authKeyId, + this._authKey.id, + this._authKeyTemp.id, + this._authKeyTempSecondary.id, + ) + return + } + + return key.decryptMessage(data, this._sessionId, callback) + } + writeMessage( writer: TlBinaryWriter, content: tl.TlObject | mtp.TlObject | Buffer, diff --git a/packages/core/src/network/network-manager.ts b/packages/core/src/network/network-manager.ts index 0c7dca82..7cbe7b8c 100644 --- a/packages/core/src/network/network-manager.ts +++ b/packages/core/src/network/network-manager.ts @@ -31,6 +31,7 @@ export class DcConnectionManager { disableUpdates: this.manager.params.disableUpdates, readerMap: this.manager.params.readerMap, writerMap: this.manager.params.writerMap, + usePfs: this.manager.params.usePfs, isMainConnection: false, }) @@ -44,6 +45,10 @@ export class DcConnectionManager { ) } +/** + * Params passed into {@link NetworkManager} by {@link TelegramClient}. + * This type is intended for internal usage only. + */ export interface NetworkManagerParams { storage: ITelegramStorage crypto: ICryptoProvider @@ -63,6 +68,18 @@ export interface NetworkManagerParams { writerMap: TlWriterMap } +/** + * Additional params passed into {@link NetworkManager} by the user + * that customize the behavior of the manager + */ +export interface NetworkManagerExtraParams { + /** + * Whether to use PFS (Perfect Forward Secrecy) for all connections. + * This is disabled by default + */ + usePfs?: boolean +} + export class NetworkManager { readonly _log = this.params.log.create('network') @@ -93,7 +110,7 @@ export class NetworkManager { } constructor( - readonly params: NetworkManagerParams, + readonly params: NetworkManagerParams & NetworkManagerExtraParams, readonly config: ConfigManager ) { let deviceModel = 'mtcute on ' diff --git a/packages/core/src/network/persistent-connection.ts b/packages/core/src/network/persistent-connection.ts index dba76bf7..a931ceda 100644 --- a/packages/core/src/network/persistent-connection.ts +++ b/packages/core/src/network/persistent-connection.ts @@ -30,6 +30,7 @@ let nextConnectionUid = 0 /** * Base class for persistent connections. * Only used for {@link PersistentConnection} and used as a mean of code splitting. + * This class doesn't know anything about MTProto, it just manages the transport. */ export abstract class PersistentConnection extends EventEmitter { private _uid = nextConnectionUid++ @@ -49,9 +50,6 @@ export abstract class PersistentConnection extends EventEmitter { private _inactivityTimeout: NodeJS.Timeout | null = null private _inactive = false - // waitForMessage - private _pendingWaitForMessages: ControllablePromise[] = [] - _destroyed = false _usable = false @@ -91,7 +89,7 @@ export abstract class PersistentConnection extends EventEmitter { this._transport.setup?.(this.params.crypto, this.log) this._transport.on('ready', this.onTransportReady.bind(this)) - this._transport.on('message', this.onTransportMessage.bind(this)) + this._transport.on('message', this.onMessage.bind(this)) this._transport.on('error', this.onTransportError.bind(this)) this._transport.on('close', this.onTransportClose.bind(this)) } @@ -119,32 +117,12 @@ export abstract class PersistentConnection extends EventEmitter { } onTransportError(err: Error): void { - if (this._pendingWaitForMessages.length) { - this._pendingWaitForMessages.shift()!.reject(err) - - return - } - this._lastError = err this.onError(err) // transport is expected to emit `close` after `error` } - onTransportMessage(data: Buffer): void { - if (this._pendingWaitForMessages.length) { - this._pendingWaitForMessages.shift()!.resolve(data) - - return - } - - this.onMessage(data) - } - onTransportClose(): void { - Object.values(this._pendingWaitForMessages).forEach((prom) => - prom.reject(new Error('Connection closed')), - ) - // transport closed because of inactivity // obviously we dont want to reconnect then if (this._inactive) return @@ -219,11 +197,4 @@ export abstract class PersistentConnection extends EventEmitter { this._sendOnceConnected.push(data) } } - - waitForNextMessage(): Promise { - const promise = createControllablePromise() - this._pendingWaitForMessages.push(promise) - - return promise - } } diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index c23c4b3e..23d2b5b3 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -24,6 +24,12 @@ import { randomLong, removeFromLongArray, SortedArray, + EarlyTimer, + ControllablePromise, + createCancellablePromise, + randomBytes, + longFromBuffer, + createControllablePromise, } from '../utils' import { MtprotoSession, PendingMessage, PendingRpc } from './mtproto-session' import { doAuthorization } from './authorization' @@ -33,6 +39,9 @@ import { PersistentConnectionParams, } from './persistent-connection' import { TransportError } from './transports' +import { createAesIgeForMessageOld } from '../utils/crypto/mtproto' + +const TEMP_AUTH_KEY_EXPIRY = 300 // 86400 fixme export interface SessionConnectionParams extends PersistentConnectionParams { initConnection: tl.RawInitConnectionRequest @@ -47,9 +56,8 @@ export interface SessionConnectionParams extends PersistentConnectionParams { writerMap: TlWriterMap } -// destroy_session#e7512126 session_id:long -// todo -const DESTROY_SESSION_ID = Buffer.from('262151e7', 'hex') +// destroy_auth_key#d1435160 = DestroyAuthKeyRes; +const DESTROY_AUTH_KEY = Buffer.from('605134d1', 'hex') function makeNiceStack( error: tl.errors.RpcError, @@ -70,6 +78,12 @@ export class SessionConnection extends PersistentConnection { private _flushTimer = new EarlyTimer() private _queuedDestroySession: Long[] = [] + // waitForMessage + private _pendingWaitForUnencrypted: [ + ControllablePromise, + NodeJS.Timeout + ][] = [] + private _next429Timeout = 1000 private _current429Timeout?: NodeJS.Timeout @@ -112,6 +126,12 @@ export class SessionConnection extends PersistentConnection { onTransportClose(): void { super.onTransportClose() + + Object.values(this._pendingWaitForUnencrypted).forEach(([prom, timeout]) => { + prom.reject(new Error('Connection closed')) + clearTimeout(timeout) + }) + this.emit('disconnect') this.reset() @@ -134,11 +154,16 @@ export class SessionConnection extends PersistentConnection { } protected async onConnected(): Promise { + // check if we have all the needed keys if (!this._session._authKey.ready) { - this.log.debug('no perm auth key, authorizing...') + this.log.info('no perm auth key, authorizing...') this._authorize() + // todo: if we use pfs, we can also start temp key exchange here + } else if (this.params.usePfs && !this._session._authKeyTemp.ready) { + this.log.info('no perm auth key but using pfs, authorizing') + this._authorizePfs() } else { - this.log.debug('auth keys are already available') + this.log.info('auth keys are already available') this.onConnectionUsable() } } @@ -149,6 +174,35 @@ export class SessionConnection extends PersistentConnection { this.log.error('transport error %d', error.code) if (error.code === 404) { + // if we are using pfs, this could be due to the server + // forgetting our temp key (which is kinda weird but expected) + + if (this.params.usePfs) { + if ( + !this._isPfsBindingPending && + this._session._authKeyTemp.ready + ) { + // this is important! we must reset temp auth key before + // we proceed with new temp key derivation. + // otherwise, we can end up in an infinite loop in case it + // was actually perm_key that got 404-ed + // + // if temp key binding is already in process in background, + // _authorizePfs will mark it as foreground to prevent new + // queries from being sent (to avoid even more 404s) + this._session._authKeyTemp.reset() + this._authorizePfs() + this._onAllFailed('temp key expired, binding started') + return + } else if (this._isPfsBindingPending) { + this._onAllFailed('temp key expired, binding pending') + return + } + + // otherwise, 404 must be referencing the perm_key + } + + // there happened a little trolling this._session.reset() this.emit('key-change', null) this._authorize() @@ -156,6 +210,10 @@ export class SessionConnection extends PersistentConnection { return } + this.log.error('transport error %d', error.code) + // all pending queries must be resent + this._onAllFailed(`transport error ${error.code}`) + if (error.code === 429) { // all active queries must be resent const timeout = this._next429Timeout @@ -172,21 +230,11 @@ export class SessionConnection extends PersistentConnection { timeout, ) - for (const msgId of this._session.pendingMessages.keys()) { - const info = this._session.pendingMessages.get(msgId)! - - if (info._ === 'container') { - this._session.pendingMessages.delete(msgId) - } else { - this._onMessageFailed(msgId, 'transport flood', true) - } - } - return } } - this.emit('error', error) + this.emit('err', error) } protected onConnectionUsable() { @@ -205,7 +253,11 @@ export class SessionConnection extends PersistentConnection { this.emit('key-change', authKey) - this.onConnectionUsable() + if (this.params.usePfs) { + return this._authorizePfs() + } else { + this.onConnectionUsable() + } }) .catch((err) => { this.log.error('Authorization error: %s', err.message) @@ -214,7 +266,218 @@ export class SessionConnection extends PersistentConnection { }) } + private _authorizePfs(background = false): void { + if (this._isPfsBindingPending) return + if (this._pfsUpdateTimeout) { + clearTimeout(this._pfsUpdateTimeout) + this._pfsUpdateTimeout = undefined + } + + if (this._isPfsBindingPendingInBackground) { + // e.g. temp key has expired while we were binding a key in the background + // in this case, we shouldn't start pfs binding again, and instead wait for + // current operation to complete + this._isPfsBindingPendingInBackground = false + this._isPfsBindingPending = true + return + } + + if (background) { + this._isPfsBindingPendingInBackground = true + } else { + this._isPfsBindingPending = true + } + + doAuthorization(this, this.params.crypto, TEMP_AUTH_KEY_EXPIRY) + .then(async ([tempAuthKey, tempServerSalt]) => { + const tempKey = await this._session._authKeyTempSecondary + await tempKey.setup(tempAuthKey) + + const msgId = this._session.getMessageId() + + this.log.debug( + 'binding temp_auth_key (%h) to perm_auth_key (%h), msg_id = %l...', + tempKey.id, + this._session._authKey.id, + msgId + ) + + // we now need to bind the key + const inner: mtp.RawMt_bind_auth_key_inner = { + _: 'mt_bind_auth_key_inner', + nonce: randomLong(), + tempAuthKeyId: longFromBuffer(tempKey.id), + permAuthKeyId: longFromBuffer(this._session._authKey.id), + tempSessionId: this._session._sessionId, + expiresAt: + Math.floor(Date.now() / 1000) + TEMP_AUTH_KEY_EXPIRY, + } + + // encrypt using mtproto v1 (fucking kill me plz) + + const writer = TlBinaryWriter.alloc(this.params.writerMap, 80) + // = 40 (inner length) + 32 (mtproto header) + 8 (pad 72 so mod 16 = 0) + + writer.raw(randomBytes(16)) + writer.long(msgId) + writer.int(0) // seq_no + writer.int(40) // msg_len + writer.object(inner) + + const msgWithoutPadding = writer.result() + writer.raw(randomBytes(8)) + const msgWithPadding = writer.result() + + const hash = await this.params.crypto.sha1(msgWithoutPadding) + const msgKey = hash.slice(4, 20) + + const ige = await createAesIgeForMessageOld( + this.params.crypto, + this._session._authKey.key, + msgKey, + true + ) + const encryptedData = await ige.encrypt(msgWithPadding) + const encryptedMessage = Buffer.concat([ + this._session._authKey.id, + msgKey, + encryptedData, + ]) + + const promise = createControllablePromise() + + // encrypt the message using temp key and same msg id + // this is a bit of a hack, but it works + // + // hacking inside main send loop to allow sending + // with another key is just too much hassle. + // we could just always use temp key if one is available, + // but that way we won't be able to refresh the key + // that is about to expire in the background without + // interrupting actual message flow + // decrypting is trivial though, since key id + // is in the first bytes of the message, and is never used later on. + + this._session.pendingMessages.set(msgId, { + _: 'bind', + promise, + }) + + const request: tl.auth.RawBindTempAuthKeyRequest = { + _: 'auth.bindTempAuthKey', + permAuthKeyId: inner.permAuthKeyId, + nonce: inner.nonce, + expiresAt: inner.expiresAt, + encryptedMessage, + } + const reqSize = TlSerializationCounter.countNeededBytes( + this._writerMap, + request + ) + const reqWriter = TlBinaryWriter.alloc( + this._writerMap, + reqSize + 16 + ) + reqWriter.long(this._registerOutgoingMsgId(msgId)) + reqWriter.uint(this._session.getSeqNo()) + reqWriter.uint(reqSize) + reqWriter.object(request) + + // we can now send it as is + const requestEncrypted = await tempKey.encryptMessage( + reqWriter.result(), + tempServerSalt, + this._session._sessionId + ) + await this.send(requestEncrypted) + + const res: mtp.RawMt_rpc_error | boolean = await promise + + this._session.pendingMessages.delete(msgId) + + if (typeof res === 'object') { + this.log.error( + 'failed to bind temp key: %s:%s', + res.errorCode, + res.errorMessage + ) + throw new Error('Failed to bind temporary key') + } + + // now we can swap the keys (secondary becomes primary, + // and primary is not immediately forgot because messages using it may still be in flight) + + this._session._authKeyTempSecondary = this._session._authKeyTemp + this._session._authKeyTemp = tempKey + this._session.serverSalt = tempServerSalt + + this.log.debug( + 'temp key has been bound, exp = %d', + inner.expiresAt + ) + + this._isPfsBindingPending = false + this._isPfsBindingPendingInBackground = false + + // we must re-init connection after binding temp key + this._session.initConnectionCalled = false + + this.emit('tmp-key-change', tempAuthKey, inner.expiresAt) + this.onConnectionUsable() + + // set a timeout to update temp auth key in advance to avoid interruption + this._pfsUpdateTimeout = setTimeout(() => { + this._pfsUpdateTimeout = undefined + this.log.debug('temp key is expiring soon') + this._authorizePfs(true) + }, (TEMP_AUTH_KEY_EXPIRY - 60) * 1000) + }) + .catch((err) => { + this.log.error('PFS Authorization error: %s', err.message) + + if (this._isPfsBindingPendingInBackground) { + this._isPfsBindingPendingInBackground = false + // if we are in background, we can just retry + return this._authorizePfs(true) + } + + this._isPfsBindingPending = false + this.onError(err) + this.reconnect() + }) + } + + waitForUnencryptedMessage(timeout = 5000): Promise { + const promise = createControllablePromise() + const timeoutId = setTimeout(() => { + promise.reject(new Error('Timeout')) + this._pendingWaitForUnencrypted = + this._pendingWaitForUnencrypted.filter( + (it) => it[0] !== promise + ) + }, timeout) + this._pendingWaitForUnencrypted.push([promise, timeoutId]) + + return promise + } + protected async onMessage(data: Buffer): Promise { + if (data.readInt32LE(0) === 0 && data.readInt32LE(4) === 0) { + // auth_key_id = 0, meaning it's an unencrypted message used for authorization + + if (this._pendingWaitForUnencrypted.length) { + const [promise, timeout] = this._pendingWaitForUnencrypted.shift()! + clearTimeout(timeout) + promise.resolve(data) + } else { + this.log.debug( + 'unencrypted message received, but no one is waiting for it' + ) + } + + return + } + if (!this._session._authKey.ready) { // if a message is received before authorization, // either the server is misbehaving, @@ -225,11 +488,7 @@ export class SessionConnection extends PersistentConnection { } try { - await this._session._authKey.decryptMessage( - data, - this._session._sessionId, - this._handleRawMessage - ) + await this._session.decryptMessage(data, this._handleRawMessage) } catch (err) { this.log.error('failed to decrypt message: %s\ndata: %h', err, data) } @@ -376,6 +635,10 @@ export class SessionConnection extends PersistentConnection { message, ) break + case 'mt_destroy_session_ok': + case 'mt_destroy_session_none': + this._onDestroySessionResult(message) + break default: if (tl.isAnyUpdates(message)) { if (this._usable && this.params.inactivityTimeout) { @@ -386,6 +649,8 @@ export class SessionConnection extends PersistentConnection { this.log.warn( 'received updates, but updates are disabled' ) + // likely due to some request in the session missing invokeWithoutUpdates + // todo: reset session break } if (!this.params.isMainConnection) { @@ -457,7 +722,13 @@ export class SessionConnection extends PersistentConnection { return } + // special case for auth key binding if (msg._ !== 'rpc') { + if (msg._ === 'bind') { + msg.promise.resolve(result) + return + } + this.log.error( 'received rpc_result for %s request %l', msg._, @@ -466,6 +737,7 @@ export class SessionConnection extends PersistentConnection { return } + const rpc = msg.rpc const customReader = this._readerMap._results![rpc.method] @@ -475,7 +747,9 @@ export class SessionConnection extends PersistentConnection { // initConnection call was definitely received and // processed by the server, so we no longer need to use it - if (rpc.initConn) this._session.initConnectionCalled = true + if (rpc.initConn) { + this._session.initConnectionCalled = true + } this.log.verbose('<<< (%s) %j', rpc.method, result) @@ -489,6 +763,44 @@ export class SessionConnection extends PersistentConnection { rpc.method, ) + if (res.errorMessage === 'AUTH_KEY_PERM_EMPTY') { + // happens when temp auth key is not yet bound + // this shouldn't happen as we block any outbound communications + // until the temp key is derived and bound. + // + // i think it is also possible for the error to be returned + // when the temp key has expired, but this still shouldn't happen + // but this is tg, so something may go wrong, and we will receive this as an error + // (for god's sake why is this not in mtproto and instead hacked into the app layer) + this._authorizePfs() + this._onMessageFailed(reqMsgId, 'AUTH_KEY_PERM_EMPTY', true) + return + } + + if (res.errorMessage === 'CONNECTION_NOT_INITED') { + // this seems to sometimes happen when using pfs + // no idea why, but tdlib also seems to handle these, so whatever + + this._session.initConnectionCalled = false + this._onMessageFailed(reqMsgId, res.errorMessage, true) + + // just setting this flag is not enough because the message + // is already serialized, so we do this awesome hack + this.sendRpc({ _: 'help.getNearestDc' }) + .then(() => { + this.log.debug( + 'additional help.getNearestDc for initConnection ok' + ) + }) + .catch((err) => { + this.log.debug( + 'additional help.getNearestDc for initConnection error: %s', + err + ) + }) + return + } + if (rpc.cancelled) return const error = tl.errors.createRpcErrorFromTl(res) @@ -567,6 +879,8 @@ export class SessionConnection extends PersistentConnection { this._session.getStateSchedule.remove(rpc) break } + case 'bind': + break // do nothing, wait for the result default: if (!inContainer) { this.log.warn( @@ -578,6 +892,31 @@ export class SessionConnection extends PersistentConnection { } } + private _onAllFailed(reason: string) { + // called when all the pending messages are to be resent + // e.g. when server returns 429 + + // most service messages can be omitted as stale + this._resetLastPing(true) + + for (const msgId of this._session.pendingMessages.keys()) { + const info = this._session.pendingMessages.get(msgId)! + + switch (info._) { + case 'container': + case 'state': + case 'resend': + case 'ping': + // no longer relevant + this._session.pendingMessages.delete(msgId) + break + default: + this._onMessageFailed(msgId, reason, true) + break + } + } + } + private _onMessageFailed( msgId: Long, reason: string, @@ -669,6 +1008,13 @@ export class SessionConnection extends PersistentConnection { this._session.queuedStateReq.splice(0, 0, ...msgInfo.msgIds) this._flushTimer.emitWhenIdle() break + case 'bind': + this.log.debug( + 'temp key binding request %l failed because of %s, retrying', + msgId, + reason + ) + msgInfo.promise.reject(Error(reason)) } this._session.pendingMessages.delete(msgId) @@ -778,6 +1124,8 @@ export class SessionConnection extends PersistentConnection { // something went very wrong, we need to reset the session this.log.error( 'received bad_msg_notification for msg_id = %l, code = %d. session will be reset', + msg.badMsgId, + msg.errorCode ) this._resetSession() break @@ -819,6 +1167,14 @@ export class SessionConnection extends PersistentConnection { for (const msgId of this._session.pendingMessages.keys()) { const val = this._session.pendingMessages.get(msgId)! + if (val._ === 'bind') { + // should NOT happen. + if (msgId.lt(firstMsgId)) { + this._onMessageFailed(msgId, 'received in wrong session') + } + continue + } + if (val._ === 'container') { if (msgId.lt(firstMsgId)) { // all messages in this container will be resent @@ -942,6 +1298,24 @@ export class SessionConnection extends PersistentConnection { this._onMessagesInfo(info.msgIds, msg.info) } + private _onDestroySessionResult(msg: mtp.TypeDestroySessionRes): void { + const reqMsgId = this._session.destroySessionIdToMsgId.get( + msg.sessionId + ) + if (!reqMsgId) { + this.log.warn( + 'received %s for unknown session %h', + msg._, + msg.sessionId + ) + return + } + + this._session.destroySessionIdToMsgId.delete(msg.sessionId) + this._session.pendingMessages.delete(reqMsgId) + this.log.debug('received %s for session %h', msg._, msg.sessionId) + } + private _enqueueRpc(rpc: PendingRpc, force?: boolean) { if (this._session.enqueueRpc(rpc, force)) this._flushTimer.emitWhenIdle() @@ -1133,7 +1507,11 @@ export class SessionConnection extends PersistentConnection { } private _flush(): void { - if (!this._session._authKey.ready || this._current429Timeout) { + if ( + !this._session._authKey.ready || + this._isPfsBindingPending || + this._current429Timeout + ) { // it will be flushed once connection is usable return } @@ -1362,7 +1740,7 @@ export class SessionConnection extends PersistentConnection { const otherPendings: Exclude< PendingMessage, - { _: 'rpc' | 'container' } + { _: 'rpc' | 'container' | 'bind' } >[] = [] if (ackRequest) { @@ -1445,6 +1823,7 @@ export class SessionConnection extends PersistentConnection { containerId: msgId, } this._session.pendingMessages.set(msgId, pending) + this._session.destroySessionIdToMsgId.set(sessionId, msgId) otherPendings.push(pending) }) } @@ -1561,12 +1940,8 @@ export class SessionConnection extends PersistentConnection { rootMsgId, ) - this._session._authKey - .encryptMessage( - result, - this._session.serverSalt, - this._session._sessionId - ) + this._session + .encryptMessage(result) .then((enc) => this.send(enc)) .catch((err) => { this.log.error( diff --git a/packages/core/src/storage/abstract.ts b/packages/core/src/storage/abstract.ts index c80ded2d..88d4ae5b 100644 --- a/packages/core/src/storage/abstract.ts +++ b/packages/core/src/storage/abstract.ts @@ -78,12 +78,27 @@ export interface ITelegramStorage { /** * Get auth_key for a given DC * (returning null will start authorization) + * For temp keys: should also return null if the key has expired + * + * @param dcId DC ID + * @param tempIndex Index of the temporary key (usually 0, used for multi-connections) */ - getAuthKeyFor(dcId: number): MaybeAsync + getAuthKeyFor(dcId: number, tempIndex?: number): MaybeAsync /** * Set auth_key for a given DC */ setAuthKeyFor(dcId: number, key: Buffer | null): MaybeAsync + /** + * Set temp_auth_key for a given DC + * expiresAt is unix time in ms + */ + setTempAuthKeyFor(dcId: number, index: number, key: Buffer | null, expiresAt: number): MaybeAsync + /** + * Remove all saved auth keys (both temp and perm) + * for the given DC. Used when perm_key becomes invalid, + * meaning all temp_keys also become invalid + */ + dropAuthKeysFor(dcId: number): MaybeAsync /** * Get information about currently logged in user (if available) diff --git a/packages/core/src/storage/memory.ts b/packages/core/src/storage/memory.ts index 9fff5a4d..a367c51f 100644 --- a/packages/core/src/storage/memory.ts +++ b/packages/core/src/storage/memory.ts @@ -15,6 +15,8 @@ export interface MemorySessionState { defaultDc: tl.RawDcOption | null authKeys: Record + authKeysTemp: Record + authKeysTempExpiry: Record // marked peer id -> entity info entities: Record @@ -110,6 +112,8 @@ export class MemoryStorage implements ITelegramStorage, IStateStorage { $version: CURRENT_VERSION, defaultDc: null, authKeys: {}, + authKeysTemp: {}, + authKeysTempExpiry: {}, entities: {}, phoneIndex: {}, usernameIndex: {}, @@ -187,14 +191,43 @@ export class MemoryStorage implements ITelegramStorage, IStateStorage { this._state.defaultDc = dc } + setTempAuthKeyFor( + dcId: number, + index: number, + key: Buffer | null, + expiresAt: number + ): void { + const k = `${dcId}:${index}` + this._state.authKeysTemp[k] = key + this._state.authKeysTempExpiry[k] = expiresAt + } + setAuthKeyFor(dcId: number, key: Buffer | null): void { this._state.authKeys[dcId] = key } - getAuthKeyFor(dcId: number): Buffer | null { + getAuthKeyFor(dcId: number, tempIndex?: number): Buffer | null { + if (tempIndex !== undefined) { + const k = `${dcId}:${tempIndex}` + + if (Date.now() > (this._state.authKeysTempExpiry[k] ?? 0)) + return null + return this._state.authKeysTemp[k] + } + return this._state.authKeys[dcId] ?? null } + dropAuthKeysFor(dcId: number): void { + this._state.authKeys[dcId] = null + Object.keys(this._state.authKeysTemp).forEach((key) => { + if (key.startsWith(`${dcId}:`)) { + delete this._state.authKeysTemp[key] + delete this._state.authKeysTempExpiry[key] + } + }) + } + updatePeers(peers: PeerInfoWithUpdated[]): MaybeAsync { for (const peer of peers) { this._cachedFull.set(peer.id, peer.full) diff --git a/packages/core/src/utils/long-utils.ts b/packages/core/src/utils/long-utils.ts index b89b7ffb..afe09fe1 100644 --- a/packages/core/src/utils/long-utils.ts +++ b/packages/core/src/utils/long-utils.ts @@ -16,6 +16,21 @@ export function randomLong(unsigned = false): Long { return new Long(lo, hi, unsigned) } +/** + * Read a Long from a buffer + * + * @param buf Buffer to read from + * @param unsigned Whether the number should be unsigned + * @param le Whether the number is little-endian + */ +export function longFromBuffer(buf: Buffer, unsigned = false, le = true): Long { + if (le) { + return new Long(buf.readInt32LE(0), buf.readInt32LE(4), unsigned) + } else { + return new Long(buf.readInt32BE(4), buf.readInt32BE(0), unsigned) + } +} + /** * Remove a Long from an array * diff --git a/packages/sqlite/index.ts b/packages/sqlite/index.ts index 3774e8aa..5bd88a19 100644 --- a/packages/sqlite/index.ts +++ b/packages/sqlite/index.ts @@ -54,61 +54,64 @@ function getInputPeer( throw new Error(`Invalid peer type: ${row.type}`) } -const CURRENT_VERSION = 2 +const CURRENT_VERSION = 3 -// language=SQLite +// language=SQLite format=false +const TEMP_AUTH_TABLE = ` + create table temp_auth_keys ( + dc integer not null, + idx integer not null, + key blob not null, + expires integer not null, + primary key (dc, idx) + ); +` + +// language=SQLite format=false const SCHEMA = ` - create table kv - ( - key text primary key, + create table kv ( + key text primary key, value text not null ); - create table state - ( - key text primary key, - value text not null, + create table state ( + key text primary key, + value text not null, expires number ); - create table auth_keys - ( - dc integer primary key, + create table auth_keys ( + dc integer primary key, key blob not null ); - create table pts - ( + ${TEMP_AUTH_TABLE} + + create table pts ( channel_id integer primary key, - pts integer not null + pts integer not null ); - create table entities - ( - id integer primary key, - hash text not null, - type text not null, + create table entities ( + id integer primary key, + hash text not null, + type text not null, username text, - phone text, - updated integer not null, - "full" blob + phone text, + updated integer not null, + "full" blob ); create index idx_entities_username on entities (username); create index idx_entities_phone on entities (phone); ` +// language=SQLite format=false const RESET = ` - delete - from kv - where key <> 'ver'; - delete - from state; - delete - from auth_keys; - delete - from pts; - delete - from entities + delete from kv where key <> 'ver'; + delete from state; + delete from auth_keys; + delete from pts; + delete from entities ` const USERNAME_TTL = 86400000 // 24 hours @@ -144,8 +147,14 @@ const STATEMENTS = { delState: 'delete from state where key = ?', getAuth: 'select key from auth_keys where dc = ?', + getAuthTemp: + 'select key from temp_auth_keys where dc = ? and idx = ? and expires > ?', setAuth: 'insert or replace into auth_keys (dc, key) values (?, ?)', + setAuthTemp: + 'insert or replace into temp_auth_keys (dc, idx, key, expires) values (?, ?, ?, ?)', delAuth: 'delete from auth_keys where dc = ?', + delAuthTemp: 'delete from temp_auth_keys where dc = ? and idx = ?', + delAllAuthTemp: 'delete from temp_auth_keys where dc = ?', getPts: 'select pts from pts where channel_id = ?', setPts: 'insert or replace into pts (channel_id, pts) values (?, ?)', @@ -376,6 +385,17 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage { 'Unsupported session version, please migrate manually', ) } + + if (from === 2) { + // PFS support added + this._db.exec(TEMP_AUTH_TABLE) + from = 3 + } + + if (from !== CURRENT_VERSION) { + // an assertion just in case i messed up + throw new Error('Migration incomplete') + } } private _initializeStatements(): void { @@ -481,10 +501,15 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage { return this._getFromKv('def_dc') } - getAuthKeyFor(dcId: number): Buffer | null { - const row = this._statements.getAuth.get(dcId) + getAuthKeyFor(dcId: number, tempIndex?: number): Promise { + let row + if (tempIndex !== undefined) { + row = this._statements.getAuthTemp.get(dcId, tempIndex, Date.now()) + } else { + row = this._statements.getAuth.get(dcId) + } - return row ? (row as { key: Buffer }).key : null + return row ? row.key : null } setAuthKeyFor(dcId: number, key: Buffer | null): void { @@ -494,6 +519,27 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage { ]) } + setTempAuthKeyFor( + dcId: number, + index: number, + key: Buffer | null, + expires: number + ): void { + this._pending.push([ + key === null + ? this._statements.delAuthTemp + : this._statements.setAuthTemp, + key === null ? [dcId, index] : [dcId, index, key, expires], + ]) + } + + dropAuthKeysFor(dcId: number): void { + this._pending.push( + [this._statements.delAuth, [dcId]], + [this._statements.delAllAuthTemp, [dcId]] + ) + } + getSelf(): ITelegramStorage.SelfInfo | null { return this._getFromKv('self') }