feat(core): (initial) support pfs

This commit is contained in:
teidesu 2022-11-07 00:08:59 +03:00 committed by Alina Sireneva
parent c9a86c28f0
commit a23197df91
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
11 changed files with 645 additions and 117 deletions

View file

@ -57,7 +57,7 @@ import { PersistentConnectionParams } from './network/persistent-connection'
import { ITelegramStorage, MemoryStorage } from './storage' import { ITelegramStorage, MemoryStorage } from './storage'
import { ConfigManager } from './network/config-manager' import { ConfigManager } from './network/config-manager'
import { NetworkManager } from "./network/network-manager"; import { NetworkManager, NetworkManagerExtraParams } from "./network/network-manager";
export interface BaseTelegramClientOptions { export interface BaseTelegramClientOptions {
/** /**
@ -176,10 +176,15 @@ export interface BaseTelegramClientOptions {
*/ */
niceStacks?: boolean niceStacks?: boolean
/**
* Extra parameters for {@link NetworkManager}
*/
network?: NetworkManagerExtraParams
/** /**
* **EXPERT USE ONLY!** * **EXPERT USE ONLY!**
* *
* Override TL layer used for the connection. * Override TL layer used for the connection. /;'
* *
* **Does not** change the schema used. * **Does not** change the schema used.
*/ */
@ -329,6 +334,7 @@ export class BaseTelegramClient extends EventEmitter {
storage: this.storage, storage: this.storage,
testMode: this._testMode, testMode: this._testMode,
transport: opts.transport, transport: opts.transport,
...(opts.network ?? {}),
}, this._config) }, this._config)
this.storage.setup?.(this.log, this._readerMap, this._writerMap) this.storage.setup?.(this.log, this._readerMap, this._writerMap)

View file

@ -30,6 +30,8 @@ export class AuthKey {
this.clientSalt = authKey.slice(88, 120) this.clientSalt = authKey.slice(88, 120)
this.serverSalt = authKey.slice(96, 128) this.serverSalt = authKey.slice(96, 128)
this.id = (await this._crypto.sha1(authKey)).slice(-8) this.id = (await this._crypto.sha1(authKey)).slice(-8)
this.log.verbose('auth key set up, id = %h', this.id)
} }
async encryptMessage( async encryptMessage(

View file

@ -102,6 +102,7 @@ async function rsaEncrypt(
export async function doAuthorization( export async function doAuthorization(
connection: SessionConnection, connection: SessionConnection,
crypto: ICryptoProvider, crypto: ICryptoProvider,
expiresIn?: number
): Promise<[Buffer, Long, number]> { ): Promise<[Buffer, Long, number]> {
// eslint-disable-next-line dot-notation // eslint-disable-next-line dot-notation
const session = connection['_session'] const session = connection['_session']
@ -128,16 +129,17 @@ export async function doAuthorization(
async function readNext(): Promise<mtp.TlObject> { async function readNext(): Promise<mtp.TlObject> {
return TlBinaryReader.deserializeObject( return TlBinaryReader.deserializeObject(
readerMap, readerMap,
await connection.waitForNextMessage(), await connection.waitForUnencryptedMessage(),
20, // skip mtproto header 20 // skip mtproto header
) )
} }
const log = connection.log.create('auth') const log = connection.log.create('auth')
if (expiresIn) log.prefix = '[PFS] '
const nonce = randomBytes(16) const nonce = randomBytes(16)
// Step 1: PQ request // Step 1: PQ request
log.debug('starting PQ handshake, nonce = %h', nonce) log.debug('starting PQ handshake (temp = %b), nonce = %h', expiresIn, nonce)
await sendPlainMessage({ _: 'mt_req_pq_multi', nonce }) await sendPlainMessage({ _: 'mt_req_pq_multi', nonce })
const resPq = await readNext() const resPq = await readNext()
@ -175,8 +177,8 @@ export async function doAuthorization(
if (connection.params.testMode) dcId += 10000 if (connection.params.testMode) dcId += 10000
if (connection.params.dc.mediaOnly) dcId = -dcId if (connection.params.dc.mediaOnly) dcId = -dcId
const _pqInnerData: mtp.RawMt_p_q_inner_data_dc = { const _pqInnerData: mtp.TypeP_Q_inner_data = {
_: 'mt_p_q_inner_data_dc', _: expiresIn ? 'mt_p_q_inner_data_temp_dc' : 'mt_p_q_inner_data_dc',
pq: resPq.pq, pq: resPq.pq,
p, p,
q, q,
@ -184,6 +186,7 @@ export async function doAuthorization(
newNonce, newNonce,
serverNonce: resPq.serverNonce, serverNonce: resPq.serverNonce,
dc: dcId, dc: dcId,
expiresIn: expiresIn! // whatever
} }
const pqInnerData = TlBinaryWriter.serializeObject(writerMap, _pqInnerData) const pqInnerData = TlBinaryWriter.serializeObject(writerMap, _pqInnerData)

View file

@ -80,6 +80,10 @@ export type PendingMessage =
_: 'future_salts' _: 'future_salts'
containerId: Long containerId: Long
} }
| {
_: 'bind'
promise: ControllablePromise
}
/** /**
* Class encapsulating a single MTProto session and storing * Class encapsulating a single MTProto session and storing
@ -89,6 +93,8 @@ export class MtprotoSession {
_sessionId = randomLong() _sessionId = randomLong()
_authKey = new AuthKey(this._crypto, this.log, this._readerMap) _authKey = new AuthKey(this._crypto, this.log, this._readerMap)
_authKeyTemp = new AuthKey(this._crypto, this.log, this._readerMap)
_authKeyTempSecondary = new AuthKey(this._crypto, this.log, this._readerMap)
_timeOffset = 0 _timeOffset = 0
_lastMessageId = Long.ZERO _lastMessageId = Long.ZERO
@ -114,6 +120,7 @@ export class MtprotoSession {
// requests info // requests info
pendingMessages = new LongMap<PendingMessage>() pendingMessages = new LongMap<PendingMessage>()
destroySessionIdToMsgId = new LongMap<Long>()
initConnectionCalled = false initConnectionCalled = false
@ -131,6 +138,8 @@ export class MtprotoSession {
*/ */
reset(): void { reset(): void {
this._authKey.reset() this._authKey.reset()
this._authKeyTemp.reset()
this._authKeyTempSecondary.reset()
this.resetState() this.resetState()
} }
@ -146,7 +155,7 @@ export class MtprotoSession {
this._seqNo = 0 this._seqNo = 0
this._sessionId = randomLong() this._sessionId = randomLong()
this.log.debug('session reset, new sid = %l', this._sessionId) this.log.debug('session reset, new sid = %h', this._sessionId)
this.log.prefix = `[SESSION ${this._sessionId.toString(16)}] ` this.log.prefix = `[SESSION ${this._sessionId.toString(16)}] `
// reset session state // reset session state
@ -222,6 +231,42 @@ export class MtprotoSession {
return seqNo return seqNo
} }
/** Encrypt a single MTProto message using session's keys */
async encryptMessage(message: Buffer): Promise<Buffer> {
const key = this._authKeyTemp.ready ? this._authKeyTemp : this._authKey
return key.encryptMessage(message, this.serverSalt, this._sessionId)
}
/** Decrypt a single MTProto message using session's keys */
async decryptMessage(
data: Buffer,
callback: Parameters<AuthKey['decryptMessage']>[2]
): Promise<void> {
if (!this._authKey.ready) throw new Error('Keys are not set up!')
const authKeyId = data.slice(0, 8)
let key: AuthKey
if (this._authKey.match(authKeyId)) {
key = this._authKey
} else if (this._authKeyTemp.match(authKeyId)) {
key = this._authKeyTemp
} else if (this._authKeyTempSecondary.match(authKeyId)) {
key = this._authKeyTempSecondary
} else {
this.log.warn(
'received message with unknown authKey = %h (expected %h or %h or %h)',
authKeyId,
this._authKey.id,
this._authKeyTemp.id,
this._authKeyTempSecondary.id,
)
return
}
return key.decryptMessage(data, this._sessionId, callback)
}
writeMessage( writeMessage(
writer: TlBinaryWriter, writer: TlBinaryWriter,
content: tl.TlObject | mtp.TlObject | Buffer, content: tl.TlObject | mtp.TlObject | Buffer,

View file

@ -31,6 +31,7 @@ export class DcConnectionManager {
disableUpdates: this.manager.params.disableUpdates, disableUpdates: this.manager.params.disableUpdates,
readerMap: this.manager.params.readerMap, readerMap: this.manager.params.readerMap,
writerMap: this.manager.params.writerMap, writerMap: this.manager.params.writerMap,
usePfs: this.manager.params.usePfs,
isMainConnection: false, isMainConnection: false,
}) })
@ -44,6 +45,10 @@ export class DcConnectionManager {
) )
} }
/**
* Params passed into {@link NetworkManager} by {@link TelegramClient}.
* This type is intended for internal usage only.
*/
export interface NetworkManagerParams { export interface NetworkManagerParams {
storage: ITelegramStorage storage: ITelegramStorage
crypto: ICryptoProvider crypto: ICryptoProvider
@ -63,6 +68,18 @@ export interface NetworkManagerParams {
writerMap: TlWriterMap writerMap: TlWriterMap
} }
/**
* Additional params passed into {@link NetworkManager} by the user
* that customize the behavior of the manager
*/
export interface NetworkManagerExtraParams {
/**
* Whether to use PFS (Perfect Forward Secrecy) for all connections.
* This is disabled by default
*/
usePfs?: boolean
}
export class NetworkManager { export class NetworkManager {
readonly _log = this.params.log.create('network') readonly _log = this.params.log.create('network')
@ -93,7 +110,7 @@ export class NetworkManager {
} }
constructor( constructor(
readonly params: NetworkManagerParams, readonly params: NetworkManagerParams & NetworkManagerExtraParams,
readonly config: ConfigManager readonly config: ConfigManager
) { ) {
let deviceModel = 'mtcute on ' let deviceModel = 'mtcute on '

View file

@ -30,6 +30,7 @@ let nextConnectionUid = 0
/** /**
* Base class for persistent connections. * Base class for persistent connections.
* Only used for {@link PersistentConnection} and used as a mean of code splitting. * Only used for {@link PersistentConnection} and used as a mean of code splitting.
* This class doesn't know anything about MTProto, it just manages the transport.
*/ */
export abstract class PersistentConnection extends EventEmitter { export abstract class PersistentConnection extends EventEmitter {
private _uid = nextConnectionUid++ private _uid = nextConnectionUid++
@ -49,9 +50,6 @@ export abstract class PersistentConnection extends EventEmitter {
private _inactivityTimeout: NodeJS.Timeout | null = null private _inactivityTimeout: NodeJS.Timeout | null = null
private _inactive = false private _inactive = false
// waitForMessage
private _pendingWaitForMessages: ControllablePromise<Buffer>[] = []
_destroyed = false _destroyed = false
_usable = false _usable = false
@ -91,7 +89,7 @@ export abstract class PersistentConnection extends EventEmitter {
this._transport.setup?.(this.params.crypto, this.log) this._transport.setup?.(this.params.crypto, this.log)
this._transport.on('ready', this.onTransportReady.bind(this)) this._transport.on('ready', this.onTransportReady.bind(this))
this._transport.on('message', this.onTransportMessage.bind(this)) this._transport.on('message', this.onMessage.bind(this))
this._transport.on('error', this.onTransportError.bind(this)) this._transport.on('error', this.onTransportError.bind(this))
this._transport.on('close', this.onTransportClose.bind(this)) this._transport.on('close', this.onTransportClose.bind(this))
} }
@ -119,32 +117,12 @@ export abstract class PersistentConnection extends EventEmitter {
} }
onTransportError(err: Error): void { onTransportError(err: Error): void {
if (this._pendingWaitForMessages.length) {
this._pendingWaitForMessages.shift()!.reject(err)
return
}
this._lastError = err this._lastError = err
this.onError(err) this.onError(err)
// transport is expected to emit `close` after `error` // transport is expected to emit `close` after `error`
} }
onTransportMessage(data: Buffer): void {
if (this._pendingWaitForMessages.length) {
this._pendingWaitForMessages.shift()!.resolve(data)
return
}
this.onMessage(data)
}
onTransportClose(): void { onTransportClose(): void {
Object.values(this._pendingWaitForMessages).forEach((prom) =>
prom.reject(new Error('Connection closed')),
)
// transport closed because of inactivity // transport closed because of inactivity
// obviously we dont want to reconnect then // obviously we dont want to reconnect then
if (this._inactive) return if (this._inactive) return
@ -219,11 +197,4 @@ export abstract class PersistentConnection extends EventEmitter {
this._sendOnceConnected.push(data) this._sendOnceConnected.push(data)
} }
} }
waitForNextMessage(): Promise<Buffer> {
const promise = createControllablePromise<Buffer>()
this._pendingWaitForMessages.push(promise)
return promise
}
} }

View file

@ -24,6 +24,12 @@ import {
randomLong, randomLong,
removeFromLongArray, removeFromLongArray,
SortedArray, SortedArray,
EarlyTimer,
ControllablePromise,
createCancellablePromise,
randomBytes,
longFromBuffer,
createControllablePromise,
} from '../utils' } from '../utils'
import { MtprotoSession, PendingMessage, PendingRpc } from './mtproto-session' import { MtprotoSession, PendingMessage, PendingRpc } from './mtproto-session'
import { doAuthorization } from './authorization' import { doAuthorization } from './authorization'
@ -33,6 +39,9 @@ import {
PersistentConnectionParams, PersistentConnectionParams,
} from './persistent-connection' } from './persistent-connection'
import { TransportError } from './transports' import { TransportError } from './transports'
import { createAesIgeForMessageOld } from '../utils/crypto/mtproto'
const TEMP_AUTH_KEY_EXPIRY = 300 // 86400 fixme
export interface SessionConnectionParams extends PersistentConnectionParams { export interface SessionConnectionParams extends PersistentConnectionParams {
initConnection: tl.RawInitConnectionRequest initConnection: tl.RawInitConnectionRequest
@ -47,9 +56,8 @@ export interface SessionConnectionParams extends PersistentConnectionParams {
writerMap: TlWriterMap writerMap: TlWriterMap
} }
// destroy_session#e7512126 session_id:long // destroy_auth_key#d1435160 = DestroyAuthKeyRes;
// todo const DESTROY_AUTH_KEY = Buffer.from('605134d1', 'hex')
const DESTROY_SESSION_ID = Buffer.from('262151e7', 'hex')
function makeNiceStack( function makeNiceStack(
error: tl.errors.RpcError, error: tl.errors.RpcError,
@ -70,6 +78,12 @@ export class SessionConnection extends PersistentConnection {
private _flushTimer = new EarlyTimer() private _flushTimer = new EarlyTimer()
private _queuedDestroySession: Long[] = [] private _queuedDestroySession: Long[] = []
// waitForMessage
private _pendingWaitForUnencrypted: [
ControllablePromise<Buffer>,
NodeJS.Timeout
][] = []
private _next429Timeout = 1000 private _next429Timeout = 1000
private _current429Timeout?: NodeJS.Timeout private _current429Timeout?: NodeJS.Timeout
@ -112,6 +126,12 @@ export class SessionConnection extends PersistentConnection {
onTransportClose(): void { onTransportClose(): void {
super.onTransportClose() super.onTransportClose()
Object.values(this._pendingWaitForUnencrypted).forEach(([prom, timeout]) => {
prom.reject(new Error('Connection closed'))
clearTimeout(timeout)
})
this.emit('disconnect') this.emit('disconnect')
this.reset() this.reset()
@ -134,11 +154,16 @@ export class SessionConnection extends PersistentConnection {
} }
protected async onConnected(): Promise<void> { protected async onConnected(): Promise<void> {
// check if we have all the needed keys
if (!this._session._authKey.ready) { if (!this._session._authKey.ready) {
this.log.debug('no perm auth key, authorizing...') this.log.info('no perm auth key, authorizing...')
this._authorize() this._authorize()
// todo: if we use pfs, we can also start temp key exchange here
} else if (this.params.usePfs && !this._session._authKeyTemp.ready) {
this.log.info('no perm auth key but using pfs, authorizing')
this._authorizePfs()
} else { } else {
this.log.debug('auth keys are already available') this.log.info('auth keys are already available')
this.onConnectionUsable() this.onConnectionUsable()
} }
} }
@ -149,6 +174,35 @@ export class SessionConnection extends PersistentConnection {
this.log.error('transport error %d', error.code) this.log.error('transport error %d', error.code)
if (error.code === 404) { if (error.code === 404) {
// if we are using pfs, this could be due to the server
// forgetting our temp key (which is kinda weird but expected)
if (this.params.usePfs) {
if (
!this._isPfsBindingPending &&
this._session._authKeyTemp.ready
) {
// this is important! we must reset temp auth key before
// we proceed with new temp key derivation.
// otherwise, we can end up in an infinite loop in case it
// was actually perm_key that got 404-ed
//
// if temp key binding is already in process in background,
// _authorizePfs will mark it as foreground to prevent new
// queries from being sent (to avoid even more 404s)
this._session._authKeyTemp.reset()
this._authorizePfs()
this._onAllFailed('temp key expired, binding started')
return
} else if (this._isPfsBindingPending) {
this._onAllFailed('temp key expired, binding pending')
return
}
// otherwise, 404 must be referencing the perm_key
}
// there happened a little trolling
this._session.reset() this._session.reset()
this.emit('key-change', null) this.emit('key-change', null)
this._authorize() this._authorize()
@ -156,6 +210,10 @@ export class SessionConnection extends PersistentConnection {
return return
} }
this.log.error('transport error %d', error.code)
// all pending queries must be resent
this._onAllFailed(`transport error ${error.code}`)
if (error.code === 429) { if (error.code === 429) {
// all active queries must be resent // all active queries must be resent
const timeout = this._next429Timeout const timeout = this._next429Timeout
@ -172,21 +230,11 @@ export class SessionConnection extends PersistentConnection {
timeout, timeout,
) )
for (const msgId of this._session.pendingMessages.keys()) {
const info = this._session.pendingMessages.get(msgId)!
if (info._ === 'container') {
this._session.pendingMessages.delete(msgId)
} else {
this._onMessageFailed(msgId, 'transport flood', true)
}
}
return return
} }
} }
this.emit('error', error) this.emit('err', error)
} }
protected onConnectionUsable() { protected onConnectionUsable() {
@ -205,7 +253,11 @@ export class SessionConnection extends PersistentConnection {
this.emit('key-change', authKey) this.emit('key-change', authKey)
if (this.params.usePfs) {
return this._authorizePfs()
} else {
this.onConnectionUsable() this.onConnectionUsable()
}
}) })
.catch((err) => { .catch((err) => {
this.log.error('Authorization error: %s', err.message) this.log.error('Authorization error: %s', err.message)
@ -214,7 +266,218 @@ export class SessionConnection extends PersistentConnection {
}) })
} }
private _authorizePfs(background = false): void {
if (this._isPfsBindingPending) return
if (this._pfsUpdateTimeout) {
clearTimeout(this._pfsUpdateTimeout)
this._pfsUpdateTimeout = undefined
}
if (this._isPfsBindingPendingInBackground) {
// e.g. temp key has expired while we were binding a key in the background
// in this case, we shouldn't start pfs binding again, and instead wait for
// current operation to complete
this._isPfsBindingPendingInBackground = false
this._isPfsBindingPending = true
return
}
if (background) {
this._isPfsBindingPendingInBackground = true
} else {
this._isPfsBindingPending = true
}
doAuthorization(this, this.params.crypto, TEMP_AUTH_KEY_EXPIRY)
.then(async ([tempAuthKey, tempServerSalt]) => {
const tempKey = await this._session._authKeyTempSecondary
await tempKey.setup(tempAuthKey)
const msgId = this._session.getMessageId()
this.log.debug(
'binding temp_auth_key (%h) to perm_auth_key (%h), msg_id = %l...',
tempKey.id,
this._session._authKey.id,
msgId
)
// we now need to bind the key
const inner: mtp.RawMt_bind_auth_key_inner = {
_: 'mt_bind_auth_key_inner',
nonce: randomLong(),
tempAuthKeyId: longFromBuffer(tempKey.id),
permAuthKeyId: longFromBuffer(this._session._authKey.id),
tempSessionId: this._session._sessionId,
expiresAt:
Math.floor(Date.now() / 1000) + TEMP_AUTH_KEY_EXPIRY,
}
// encrypt using mtproto v1 (fucking kill me plz)
const writer = TlBinaryWriter.alloc(this.params.writerMap, 80)
// = 40 (inner length) + 32 (mtproto header) + 8 (pad 72 so mod 16 = 0)
writer.raw(randomBytes(16))
writer.long(msgId)
writer.int(0) // seq_no
writer.int(40) // msg_len
writer.object(inner)
const msgWithoutPadding = writer.result()
writer.raw(randomBytes(8))
const msgWithPadding = writer.result()
const hash = await this.params.crypto.sha1(msgWithoutPadding)
const msgKey = hash.slice(4, 20)
const ige = await createAesIgeForMessageOld(
this.params.crypto,
this._session._authKey.key,
msgKey,
true
)
const encryptedData = await ige.encrypt(msgWithPadding)
const encryptedMessage = Buffer.concat([
this._session._authKey.id,
msgKey,
encryptedData,
])
const promise = createControllablePromise()
// encrypt the message using temp key and same msg id
// this is a bit of a hack, but it works
//
// hacking inside main send loop to allow sending
// with another key is just too much hassle.
// we could just always use temp key if one is available,
// but that way we won't be able to refresh the key
// that is about to expire in the background without
// interrupting actual message flow
// decrypting is trivial though, since key id
// is in the first bytes of the message, and is never used later on.
this._session.pendingMessages.set(msgId, {
_: 'bind',
promise,
})
const request: tl.auth.RawBindTempAuthKeyRequest = {
_: 'auth.bindTempAuthKey',
permAuthKeyId: inner.permAuthKeyId,
nonce: inner.nonce,
expiresAt: inner.expiresAt,
encryptedMessage,
}
const reqSize = TlSerializationCounter.countNeededBytes(
this._writerMap,
request
)
const reqWriter = TlBinaryWriter.alloc(
this._writerMap,
reqSize + 16
)
reqWriter.long(this._registerOutgoingMsgId(msgId))
reqWriter.uint(this._session.getSeqNo())
reqWriter.uint(reqSize)
reqWriter.object(request)
// we can now send it as is
const requestEncrypted = await tempKey.encryptMessage(
reqWriter.result(),
tempServerSalt,
this._session._sessionId
)
await this.send(requestEncrypted)
const res: mtp.RawMt_rpc_error | boolean = await promise
this._session.pendingMessages.delete(msgId)
if (typeof res === 'object') {
this.log.error(
'failed to bind temp key: %s:%s',
res.errorCode,
res.errorMessage
)
throw new Error('Failed to bind temporary key')
}
// now we can swap the keys (secondary becomes primary,
// and primary is not immediately forgot because messages using it may still be in flight)
this._session._authKeyTempSecondary = this._session._authKeyTemp
this._session._authKeyTemp = tempKey
this._session.serverSalt = tempServerSalt
this.log.debug(
'temp key has been bound, exp = %d',
inner.expiresAt
)
this._isPfsBindingPending = false
this._isPfsBindingPendingInBackground = false
// we must re-init connection after binding temp key
this._session.initConnectionCalled = false
this.emit('tmp-key-change', tempAuthKey, inner.expiresAt)
this.onConnectionUsable()
// set a timeout to update temp auth key in advance to avoid interruption
this._pfsUpdateTimeout = setTimeout(() => {
this._pfsUpdateTimeout = undefined
this.log.debug('temp key is expiring soon')
this._authorizePfs(true)
}, (TEMP_AUTH_KEY_EXPIRY - 60) * 1000)
})
.catch((err) => {
this.log.error('PFS Authorization error: %s', err.message)
if (this._isPfsBindingPendingInBackground) {
this._isPfsBindingPendingInBackground = false
// if we are in background, we can just retry
return this._authorizePfs(true)
}
this._isPfsBindingPending = false
this.onError(err)
this.reconnect()
})
}
waitForUnencryptedMessage(timeout = 5000): Promise<Buffer> {
const promise = createControllablePromise<Buffer>()
const timeoutId = setTimeout(() => {
promise.reject(new Error('Timeout'))
this._pendingWaitForUnencrypted =
this._pendingWaitForUnencrypted.filter(
(it) => it[0] !== promise
)
}, timeout)
this._pendingWaitForUnencrypted.push([promise, timeoutId])
return promise
}
protected async onMessage(data: Buffer): Promise<void> { protected async onMessage(data: Buffer): Promise<void> {
if (data.readInt32LE(0) === 0 && data.readInt32LE(4) === 0) {
// auth_key_id = 0, meaning it's an unencrypted message used for authorization
if (this._pendingWaitForUnencrypted.length) {
const [promise, timeout] = this._pendingWaitForUnencrypted.shift()!
clearTimeout(timeout)
promise.resolve(data)
} else {
this.log.debug(
'unencrypted message received, but no one is waiting for it'
)
}
return
}
if (!this._session._authKey.ready) { if (!this._session._authKey.ready) {
// if a message is received before authorization, // if a message is received before authorization,
// either the server is misbehaving, // either the server is misbehaving,
@ -225,11 +488,7 @@ export class SessionConnection extends PersistentConnection {
} }
try { try {
await this._session._authKey.decryptMessage( await this._session.decryptMessage(data, this._handleRawMessage)
data,
this._session._sessionId,
this._handleRawMessage
)
} catch (err) { } catch (err) {
this.log.error('failed to decrypt message: %s\ndata: %h', err, data) this.log.error('failed to decrypt message: %s\ndata: %h', err, data)
} }
@ -376,6 +635,10 @@ export class SessionConnection extends PersistentConnection {
message, message,
) )
break break
case 'mt_destroy_session_ok':
case 'mt_destroy_session_none':
this._onDestroySessionResult(message)
break
default: default:
if (tl.isAnyUpdates(message)) { if (tl.isAnyUpdates(message)) {
if (this._usable && this.params.inactivityTimeout) { if (this._usable && this.params.inactivityTimeout) {
@ -386,6 +649,8 @@ export class SessionConnection extends PersistentConnection {
this.log.warn( this.log.warn(
'received updates, but updates are disabled' 'received updates, but updates are disabled'
) )
// likely due to some request in the session missing invokeWithoutUpdates
// todo: reset session
break break
} }
if (!this.params.isMainConnection) { if (!this.params.isMainConnection) {
@ -457,7 +722,13 @@ export class SessionConnection extends PersistentConnection {
return return
} }
// special case for auth key binding
if (msg._ !== 'rpc') { if (msg._ !== 'rpc') {
if (msg._ === 'bind') {
msg.promise.resolve(result)
return
}
this.log.error( this.log.error(
'received rpc_result for %s request %l', 'received rpc_result for %s request %l',
msg._, msg._,
@ -466,6 +737,7 @@ export class SessionConnection extends PersistentConnection {
return return
} }
const rpc = msg.rpc const rpc = msg.rpc
const customReader = this._readerMap._results![rpc.method] const customReader = this._readerMap._results![rpc.method]
@ -475,7 +747,9 @@ export class SessionConnection extends PersistentConnection {
// initConnection call was definitely received and // initConnection call was definitely received and
// processed by the server, so we no longer need to use it // processed by the server, so we no longer need to use it
if (rpc.initConn) this._session.initConnectionCalled = true if (rpc.initConn) {
this._session.initConnectionCalled = true
}
this.log.verbose('<<< (%s) %j', rpc.method, result) this.log.verbose('<<< (%s) %j', rpc.method, result)
@ -489,6 +763,44 @@ export class SessionConnection extends PersistentConnection {
rpc.method, rpc.method,
) )
if (res.errorMessage === 'AUTH_KEY_PERM_EMPTY') {
// happens when temp auth key is not yet bound
// this shouldn't happen as we block any outbound communications
// until the temp key is derived and bound.
//
// i think it is also possible for the error to be returned
// when the temp key has expired, but this still shouldn't happen
// but this is tg, so something may go wrong, and we will receive this as an error
// (for god's sake why is this not in mtproto and instead hacked into the app layer)
this._authorizePfs()
this._onMessageFailed(reqMsgId, 'AUTH_KEY_PERM_EMPTY', true)
return
}
if (res.errorMessage === 'CONNECTION_NOT_INITED') {
// this seems to sometimes happen when using pfs
// no idea why, but tdlib also seems to handle these, so whatever
this._session.initConnectionCalled = false
this._onMessageFailed(reqMsgId, res.errorMessage, true)
// just setting this flag is not enough because the message
// is already serialized, so we do this awesome hack
this.sendRpc({ _: 'help.getNearestDc' })
.then(() => {
this.log.debug(
'additional help.getNearestDc for initConnection ok'
)
})
.catch((err) => {
this.log.debug(
'additional help.getNearestDc for initConnection error: %s',
err
)
})
return
}
if (rpc.cancelled) return if (rpc.cancelled) return
const error = tl.errors.createRpcErrorFromTl(res) const error = tl.errors.createRpcErrorFromTl(res)
@ -567,6 +879,8 @@ export class SessionConnection extends PersistentConnection {
this._session.getStateSchedule.remove(rpc) this._session.getStateSchedule.remove(rpc)
break break
} }
case 'bind':
break // do nothing, wait for the result
default: default:
if (!inContainer) { if (!inContainer) {
this.log.warn( this.log.warn(
@ -578,6 +892,31 @@ export class SessionConnection extends PersistentConnection {
} }
} }
private _onAllFailed(reason: string) {
// called when all the pending messages are to be resent
// e.g. when server returns 429
// most service messages can be omitted as stale
this._resetLastPing(true)
for (const msgId of this._session.pendingMessages.keys()) {
const info = this._session.pendingMessages.get(msgId)!
switch (info._) {
case 'container':
case 'state':
case 'resend':
case 'ping':
// no longer relevant
this._session.pendingMessages.delete(msgId)
break
default:
this._onMessageFailed(msgId, reason, true)
break
}
}
}
private _onMessageFailed( private _onMessageFailed(
msgId: Long, msgId: Long,
reason: string, reason: string,
@ -669,6 +1008,13 @@ export class SessionConnection extends PersistentConnection {
this._session.queuedStateReq.splice(0, 0, ...msgInfo.msgIds) this._session.queuedStateReq.splice(0, 0, ...msgInfo.msgIds)
this._flushTimer.emitWhenIdle() this._flushTimer.emitWhenIdle()
break break
case 'bind':
this.log.debug(
'temp key binding request %l failed because of %s, retrying',
msgId,
reason
)
msgInfo.promise.reject(Error(reason))
} }
this._session.pendingMessages.delete(msgId) this._session.pendingMessages.delete(msgId)
@ -778,6 +1124,8 @@ export class SessionConnection extends PersistentConnection {
// something went very wrong, we need to reset the session // something went very wrong, we need to reset the session
this.log.error( this.log.error(
'received bad_msg_notification for msg_id = %l, code = %d. session will be reset', 'received bad_msg_notification for msg_id = %l, code = %d. session will be reset',
msg.badMsgId,
msg.errorCode
) )
this._resetSession() this._resetSession()
break break
@ -819,6 +1167,14 @@ export class SessionConnection extends PersistentConnection {
for (const msgId of this._session.pendingMessages.keys()) { for (const msgId of this._session.pendingMessages.keys()) {
const val = this._session.pendingMessages.get(msgId)! const val = this._session.pendingMessages.get(msgId)!
if (val._ === 'bind') {
// should NOT happen.
if (msgId.lt(firstMsgId)) {
this._onMessageFailed(msgId, 'received in wrong session')
}
continue
}
if (val._ === 'container') { if (val._ === 'container') {
if (msgId.lt(firstMsgId)) { if (msgId.lt(firstMsgId)) {
// all messages in this container will be resent // all messages in this container will be resent
@ -942,6 +1298,24 @@ export class SessionConnection extends PersistentConnection {
this._onMessagesInfo(info.msgIds, msg.info) this._onMessagesInfo(info.msgIds, msg.info)
} }
private _onDestroySessionResult(msg: mtp.TypeDestroySessionRes): void {
const reqMsgId = this._session.destroySessionIdToMsgId.get(
msg.sessionId
)
if (!reqMsgId) {
this.log.warn(
'received %s for unknown session %h',
msg._,
msg.sessionId
)
return
}
this._session.destroySessionIdToMsgId.delete(msg.sessionId)
this._session.pendingMessages.delete(reqMsgId)
this.log.debug('received %s for session %h', msg._, msg.sessionId)
}
private _enqueueRpc(rpc: PendingRpc, force?: boolean) { private _enqueueRpc(rpc: PendingRpc, force?: boolean) {
if (this._session.enqueueRpc(rpc, force)) if (this._session.enqueueRpc(rpc, force))
this._flushTimer.emitWhenIdle() this._flushTimer.emitWhenIdle()
@ -1133,7 +1507,11 @@ export class SessionConnection extends PersistentConnection {
} }
private _flush(): void { private _flush(): void {
if (!this._session._authKey.ready || this._current429Timeout) { if (
!this._session._authKey.ready ||
this._isPfsBindingPending ||
this._current429Timeout
) {
// it will be flushed once connection is usable // it will be flushed once connection is usable
return return
} }
@ -1362,7 +1740,7 @@ export class SessionConnection extends PersistentConnection {
const otherPendings: Exclude< const otherPendings: Exclude<
PendingMessage, PendingMessage,
{ _: 'rpc' | 'container' } { _: 'rpc' | 'container' | 'bind' }
>[] = [] >[] = []
if (ackRequest) { if (ackRequest) {
@ -1445,6 +1823,7 @@ export class SessionConnection extends PersistentConnection {
containerId: msgId, containerId: msgId,
} }
this._session.pendingMessages.set(msgId, pending) this._session.pendingMessages.set(msgId, pending)
this._session.destroySessionIdToMsgId.set(sessionId, msgId)
otherPendings.push(pending) otherPendings.push(pending)
}) })
} }
@ -1561,12 +1940,8 @@ export class SessionConnection extends PersistentConnection {
rootMsgId, rootMsgId,
) )
this._session._authKey this._session
.encryptMessage( .encryptMessage(result)
result,
this._session.serverSalt,
this._session._sessionId
)
.then((enc) => this.send(enc)) .then((enc) => this.send(enc))
.catch((err) => { .catch((err) => {
this.log.error( this.log.error(

View file

@ -78,12 +78,27 @@ export interface ITelegramStorage {
/** /**
* Get auth_key for a given DC * Get auth_key for a given DC
* (returning null will start authorization) * (returning null will start authorization)
* For temp keys: should also return null if the key has expired
*
* @param dcId DC ID
* @param tempIndex Index of the temporary key (usually 0, used for multi-connections)
*/ */
getAuthKeyFor(dcId: number): MaybeAsync<Buffer | null> getAuthKeyFor(dcId: number, tempIndex?: number): MaybeAsync<Buffer | null>
/** /**
* Set auth_key for a given DC * Set auth_key for a given DC
*/ */
setAuthKeyFor(dcId: number, key: Buffer | null): MaybeAsync<void> setAuthKeyFor(dcId: number, key: Buffer | null): MaybeAsync<void>
/**
* Set temp_auth_key for a given DC
* expiresAt is unix time in ms
*/
setTempAuthKeyFor(dcId: number, index: number, key: Buffer | null, expiresAt: number): MaybeAsync<void>
/**
* Remove all saved auth keys (both temp and perm)
* for the given DC. Used when perm_key becomes invalid,
* meaning all temp_keys also become invalid
*/
dropAuthKeysFor(dcId: number): MaybeAsync<void>
/** /**
* Get information about currently logged in user (if available) * Get information about currently logged in user (if available)

View file

@ -15,6 +15,8 @@ export interface MemorySessionState {
defaultDc: tl.RawDcOption | null defaultDc: tl.RawDcOption | null
authKeys: Record<number, Buffer | null> authKeys: Record<number, Buffer | null>
authKeysTemp: Record<string, Buffer | null>
authKeysTempExpiry: Record<string, number>
// marked peer id -> entity info // marked peer id -> entity info
entities: Record<number, PeerInfoWithUpdated> entities: Record<number, PeerInfoWithUpdated>
@ -110,6 +112,8 @@ export class MemoryStorage implements ITelegramStorage, IStateStorage {
$version: CURRENT_VERSION, $version: CURRENT_VERSION,
defaultDc: null, defaultDc: null,
authKeys: {}, authKeys: {},
authKeysTemp: {},
authKeysTempExpiry: {},
entities: {}, entities: {},
phoneIndex: {}, phoneIndex: {},
usernameIndex: {}, usernameIndex: {},
@ -187,14 +191,43 @@ export class MemoryStorage implements ITelegramStorage, IStateStorage {
this._state.defaultDc = dc this._state.defaultDc = dc
} }
setTempAuthKeyFor(
dcId: number,
index: number,
key: Buffer | null,
expiresAt: number
): void {
const k = `${dcId}:${index}`
this._state.authKeysTemp[k] = key
this._state.authKeysTempExpiry[k] = expiresAt
}
setAuthKeyFor(dcId: number, key: Buffer | null): void { setAuthKeyFor(dcId: number, key: Buffer | null): void {
this._state.authKeys[dcId] = key this._state.authKeys[dcId] = key
} }
getAuthKeyFor(dcId: number): Buffer | null { getAuthKeyFor(dcId: number, tempIndex?: number): Buffer | null {
if (tempIndex !== undefined) {
const k = `${dcId}:${tempIndex}`
if (Date.now() > (this._state.authKeysTempExpiry[k] ?? 0))
return null
return this._state.authKeysTemp[k]
}
return this._state.authKeys[dcId] ?? null return this._state.authKeys[dcId] ?? null
} }
dropAuthKeysFor(dcId: number): void {
this._state.authKeys[dcId] = null
Object.keys(this._state.authKeysTemp).forEach((key) => {
if (key.startsWith(`${dcId}:`)) {
delete this._state.authKeysTemp[key]
delete this._state.authKeysTempExpiry[key]
}
})
}
updatePeers(peers: PeerInfoWithUpdated[]): MaybeAsync<void> { updatePeers(peers: PeerInfoWithUpdated[]): MaybeAsync<void> {
for (const peer of peers) { for (const peer of peers) {
this._cachedFull.set(peer.id, peer.full) this._cachedFull.set(peer.id, peer.full)

View file

@ -16,6 +16,21 @@ export function randomLong(unsigned = false): Long {
return new Long(lo, hi, unsigned) return new Long(lo, hi, unsigned)
} }
/**
* Read a Long from a buffer
*
* @param buf Buffer to read from
* @param unsigned Whether the number should be unsigned
* @param le Whether the number is little-endian
*/
export function longFromBuffer(buf: Buffer, unsigned = false, le = true): Long {
if (le) {
return new Long(buf.readInt32LE(0), buf.readInt32LE(4), unsigned)
} else {
return new Long(buf.readInt32BE(4), buf.readInt32BE(0), unsigned)
}
}
/** /**
* Remove a Long from an array * Remove a Long from an array
* *

View file

@ -54,37 +54,45 @@ function getInputPeer(
throw new Error(`Invalid peer type: ${row.type}`) throw new Error(`Invalid peer type: ${row.type}`)
} }
const CURRENT_VERSION = 2 const CURRENT_VERSION = 3
// language=SQLite // language=SQLite format=false
const TEMP_AUTH_TABLE = `
create table temp_auth_keys (
dc integer not null,
idx integer not null,
key blob not null,
expires integer not null,
primary key (dc, idx)
);
`
// language=SQLite format=false
const SCHEMA = ` const SCHEMA = `
create table kv create table kv (
(
key text primary key, key text primary key,
value text not null value text not null
); );
create table state create table state (
(
key text primary key, key text primary key,
value text not null, value text not null,
expires number expires number
); );
create table auth_keys create table auth_keys (
(
dc integer primary key, dc integer primary key,
key blob not null key blob not null
); );
create table pts ${TEMP_AUTH_TABLE}
(
create table pts (
channel_id integer primary key, channel_id integer primary key,
pts integer not null pts integer not null
); );
create table entities create table entities (
(
id integer primary key, id integer primary key,
hash text not null, hash text not null,
type text not null, type text not null,
@ -97,18 +105,13 @@ const SCHEMA = `
create index idx_entities_phone on entities (phone); create index idx_entities_phone on entities (phone);
` `
// language=SQLite format=false
const RESET = ` const RESET = `
delete delete from kv where key <> 'ver';
from kv delete from state;
where key <> 'ver'; delete from auth_keys;
delete delete from pts;
from state; delete from entities
delete
from auth_keys;
delete
from pts;
delete
from entities
` `
const USERNAME_TTL = 86400000 // 24 hours const USERNAME_TTL = 86400000 // 24 hours
@ -144,8 +147,14 @@ const STATEMENTS = {
delState: 'delete from state where key = ?', delState: 'delete from state where key = ?',
getAuth: 'select key from auth_keys where dc = ?', getAuth: 'select key from auth_keys where dc = ?',
getAuthTemp:
'select key from temp_auth_keys where dc = ? and idx = ? and expires > ?',
setAuth: 'insert or replace into auth_keys (dc, key) values (?, ?)', setAuth: 'insert or replace into auth_keys (dc, key) values (?, ?)',
setAuthTemp:
'insert or replace into temp_auth_keys (dc, idx, key, expires) values (?, ?, ?, ?)',
delAuth: 'delete from auth_keys where dc = ?', delAuth: 'delete from auth_keys where dc = ?',
delAuthTemp: 'delete from temp_auth_keys where dc = ? and idx = ?',
delAllAuthTemp: 'delete from temp_auth_keys where dc = ?',
getPts: 'select pts from pts where channel_id = ?', getPts: 'select pts from pts where channel_id = ?',
setPts: 'insert or replace into pts (channel_id, pts) values (?, ?)', setPts: 'insert or replace into pts (channel_id, pts) values (?, ?)',
@ -376,6 +385,17 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
'Unsupported session version, please migrate manually', 'Unsupported session version, please migrate manually',
) )
} }
if (from === 2) {
// PFS support added
this._db.exec(TEMP_AUTH_TABLE)
from = 3
}
if (from !== CURRENT_VERSION) {
// an assertion just in case i messed up
throw new Error('Migration incomplete')
}
} }
private _initializeStatements(): void { private _initializeStatements(): void {
@ -481,10 +501,15 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
return this._getFromKv('def_dc') return this._getFromKv('def_dc')
} }
getAuthKeyFor(dcId: number): Buffer | null { getAuthKeyFor(dcId: number, tempIndex?: number): Promise<Buffer | null> {
const row = this._statements.getAuth.get(dcId) let row
if (tempIndex !== undefined) {
row = this._statements.getAuthTemp.get(dcId, tempIndex, Date.now())
} else {
row = this._statements.getAuth.get(dcId)
}
return row ? (row as { key: Buffer }).key : null return row ? row.key : null
} }
setAuthKeyFor(dcId: number, key: Buffer | null): void { setAuthKeyFor(dcId: number, key: Buffer | null): void {
@ -494,6 +519,27 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
]) ])
} }
setTempAuthKeyFor(
dcId: number,
index: number,
key: Buffer | null,
expires: number
): void {
this._pending.push([
key === null
? this._statements.delAuthTemp
: this._statements.setAuthTemp,
key === null ? [dcId, index] : [dcId, index, key, expires],
])
}
dropAuthKeysFor(dcId: number): void {
this._pending.push(
[this._statements.delAuth, [dcId]],
[this._statements.delAllAuthTemp, [dcId]]
)
}
getSelf(): ITelegramStorage.SelfInfo | null { getSelf(): ITelegramStorage.SelfInfo | null {
return this._getFromKv('self') return this._getFromKv('self')
} }