feat(core): sync keys between connections, exclusive lock for auth
in other words, only one main connection does authorization, others wait
This commit is contained in:
parent
bd5130c77b
commit
f68d83da06
5 changed files with 225 additions and 88 deletions
|
@ -123,6 +123,7 @@ export class MtprotoSession {
|
|||
destroySessionIdToMsgId = new LongMap<Long>()
|
||||
|
||||
initConnectionCalled = false
|
||||
authorizationPending = false
|
||||
|
||||
constructor(
|
||||
readonly _crypto: ICryptoProvider,
|
||||
|
@ -136,10 +137,12 @@ export class MtprotoSession {
|
|||
/**
|
||||
* Reset session by resetting auth key(s) and session state
|
||||
*/
|
||||
reset(): void {
|
||||
this._authKey.reset()
|
||||
this._authKeyTemp.reset()
|
||||
this._authKeyTempSecondary.reset()
|
||||
reset(withAuthKey = false): void {
|
||||
if (withAuthKey) {
|
||||
this._authKey.reset()
|
||||
this._authKeyTemp.reset()
|
||||
this._authKeyTempSecondary.reset()
|
||||
}
|
||||
|
||||
this.resetState()
|
||||
}
|
||||
|
|
|
@ -70,14 +70,18 @@ export class MultiSessionConnection extends EventEmitter {
|
|||
}
|
||||
|
||||
while (this._sessions.length < this._count) {
|
||||
this._sessions.push(
|
||||
new MtprotoSession(
|
||||
this.params.crypto,
|
||||
this._log.create('session'),
|
||||
this.params.readerMap,
|
||||
this.params.writerMap
|
||||
)
|
||||
const idx = this._sessions.length
|
||||
const session = new MtprotoSession(
|
||||
this.params.crypto,
|
||||
this._log.create('session'),
|
||||
this.params.readerMap,
|
||||
this.params.writerMap
|
||||
)
|
||||
|
||||
// brvh
|
||||
if (idx !== 0) session._authKey = this._sessions[0]._authKey
|
||||
|
||||
this._sessions.push(session)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,26 +105,49 @@ export class MultiSessionConnection extends EventEmitter {
|
|||
const session = this.params.isMainConnection
|
||||
? this._sessions[i]
|
||||
: this._sessions[0]
|
||||
const conn = new SessionConnection(this.params, session)
|
||||
const conn = new SessionConnection(
|
||||
{
|
||||
...this.params,
|
||||
isMainConnection: this.params.isMainConnection && i === 0,
|
||||
},
|
||||
session
|
||||
)
|
||||
|
||||
conn.on('update', (update) => this.emit('update', update))
|
||||
conn.on('error', (err) => this.emit('error', err, conn))
|
||||
conn.on('key-change', (key) => this.emit('key-change', i, key))
|
||||
conn.on('key-change', (key) => {
|
||||
this.emit('key-change', i, key)
|
||||
|
||||
// notify other connections
|
||||
for (const conn_ of this._connections) {
|
||||
if (conn_ === conn) continue
|
||||
conn_.onConnected()
|
||||
}
|
||||
})
|
||||
conn.on('tmp-key-change', (key, expires) =>
|
||||
this.emit('tmp-key-change', i, key, expires)
|
||||
)
|
||||
conn.on('auth-begin', () => {
|
||||
this._log.debug('received auth-begin from connection %d', i)
|
||||
this.emit('auth-begin', i)
|
||||
|
||||
// we need to reset temp auth keys if there are any left
|
||||
|
||||
this._connections.forEach((conn_) => {
|
||||
conn_._session._authKeyTemp.reset()
|
||||
if (conn_ !== conn) conn_.reconnect()
|
||||
})
|
||||
})
|
||||
conn.on('usable', () => this.emit('usable', i))
|
||||
conn.on('request-auth', () => this.emit('request-auth', i))
|
||||
|
||||
this._connections.push(conn)
|
||||
}
|
||||
}
|
||||
|
||||
destroy(): void {
|
||||
for (const conn of this._connections) {
|
||||
conn.destroy()
|
||||
}
|
||||
for (const session of this._sessions) {
|
||||
session.reset()
|
||||
}
|
||||
this._connections.forEach((conn) => conn.destroy())
|
||||
this._sessions.forEach((sess) => sess.reset())
|
||||
}
|
||||
|
||||
private _nextConnection = 0
|
||||
|
@ -168,9 +195,17 @@ export class MultiSessionConnection extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
async setAuthKey(authKey: Buffer | null, temp = false, idx = 0): Promise<void> {
|
||||
async setAuthKey(
|
||||
authKey: Buffer | null,
|
||||
temp = false,
|
||||
idx = 0
|
||||
): Promise<void> {
|
||||
const session = this._sessions[idx]
|
||||
const key = temp ? session._authKeyTemp : session._authKey
|
||||
await key.setup(authKey)
|
||||
}
|
||||
|
||||
requestAuth(): void {
|
||||
this._connections[0]._authorize()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
|
||||
import { tl } from '@mtcute/tl'
|
||||
|
||||
import { createControllablePromise, ICryptoProvider, Logger } from '../utils'
|
||||
import { ICryptoProvider, Logger } from '../utils'
|
||||
import { defaultTransportFactory, TransportFactory } from './transports'
|
||||
import {
|
||||
defaultReconnectionStrategy,
|
||||
|
@ -10,66 +10,13 @@ import {
|
|||
import { PersistentConnectionParams } from './persistent-connection'
|
||||
import { ConfigManager } from './config-manager'
|
||||
import { MultiSessionConnection } from './multi-session-connection'
|
||||
import { SessionConnection, SessionConnectionParams } from "./session-connection";
|
||||
import {
|
||||
SessionConnection,
|
||||
SessionConnectionParams,
|
||||
} from './session-connection'
|
||||
import { ITelegramStorage } from '../storage'
|
||||
|
||||
export class DcConnectionManager {
|
||||
private __baseConnectionParams = (): SessionConnectionParams => ({
|
||||
crypto: this.manager.params.crypto,
|
||||
initConnection: this.manager._initConnectionParams,
|
||||
transportFactory: this.manager._transportFactory,
|
||||
dc: this._dc,
|
||||
testMode: this.manager.params.testMode,
|
||||
reconnectionStrategy: this.manager._reconnectionStrategy,
|
||||
layer: this.manager.params.layer,
|
||||
disableUpdates: this.manager.params.disableUpdates,
|
||||
readerMap: this.manager.params.readerMap,
|
||||
writerMap: this.manager.params.writerMap,
|
||||
usePfs: this.manager.params.usePfs,
|
||||
isMainConnection: false,
|
||||
})
|
||||
|
||||
mainConnection = new MultiSessionConnection(
|
||||
{
|
||||
...this.__baseConnectionParams(),
|
||||
isMainConnection: true,
|
||||
},
|
||||
1,
|
||||
this.manager._log
|
||||
)
|
||||
|
||||
constructor(
|
||||
readonly manager: NetworkManager,
|
||||
readonly dcId: number,
|
||||
private _dc: tl.RawDcOption
|
||||
) {
|
||||
this._setupStorageHandlers(this.mainConnection)
|
||||
}
|
||||
|
||||
private _setupStorageHandlers(connection: MultiSessionConnection): void {
|
||||
connection.on('key-change', (idx, key) => {
|
||||
this.manager._log.debug('key change for dc %d from connection %d', this.dcId, idx)
|
||||
this.manager._storage.setAuthKeyFor(this.dcId, key)
|
||||
})
|
||||
connection.on('tmp-key-change', (idx, key, expires) => {
|
||||
this.manager._log.debug('temp key change for dc %d from connection %d', this.dcId, idx)
|
||||
this.manager._storage.setTempAuthKeyFor(this.dcId, idx, key, expires * 1000)
|
||||
})
|
||||
}
|
||||
|
||||
async loadKeys(): Promise<void> {
|
||||
const permanent = await this.manager._storage.getAuthKeyFor(this.dcId)
|
||||
|
||||
await this.mainConnection.setAuthKey(permanent)
|
||||
|
||||
if (this.manager.params.usePfs) {
|
||||
for (let i = 0; i < this.mainConnection._sessions.length; i++) {
|
||||
const temp = await this.manager._storage.getAuthKeyFor(this.dcId, i)
|
||||
await this.mainConnection.setAuthKey(temp, true, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
export type ConnectionKind = 'main' | 'upload' | 'download' | 'download-small'
|
||||
|
||||
/**
|
||||
* Params passed into {@link NetworkManager} by {@link TelegramClient}.
|
||||
|
@ -105,6 +52,126 @@ export interface NetworkManagerExtraParams {
|
|||
* This is disabled by default
|
||||
*/
|
||||
usePfs?: boolean
|
||||
|
||||
/**
|
||||
* Connection count for each connection kind
|
||||
*/
|
||||
connectionCount?: Partial<Record<ConnectionKind, number>>
|
||||
}
|
||||
|
||||
export class DcConnectionManager {
|
||||
private __baseConnectionParams = (): SessionConnectionParams => ({
|
||||
crypto: this.manager.params.crypto,
|
||||
initConnection: this.manager._initConnectionParams,
|
||||
transportFactory: this.manager._transportFactory,
|
||||
dc: this._dc,
|
||||
testMode: this.manager.params.testMode,
|
||||
reconnectionStrategy: this.manager._reconnectionStrategy,
|
||||
layer: this.manager.params.layer,
|
||||
disableUpdates: this.manager.params.disableUpdates,
|
||||
readerMap: this.manager.params.readerMap,
|
||||
writerMap: this.manager.params.writerMap,
|
||||
usePfs: this.manager.params.usePfs,
|
||||
isMainConnection: false,
|
||||
})
|
||||
|
||||
mainConnection = new MultiSessionConnection(
|
||||
{
|
||||
...this.__baseConnectionParams(),
|
||||
isMainConnection: true,
|
||||
},
|
||||
this.manager.params.connectionCount?.main ?? 1,
|
||||
this.manager._log
|
||||
)
|
||||
|
||||
constructor(
|
||||
readonly manager: NetworkManager,
|
||||
readonly dcId: number,
|
||||
private _dc: tl.RawDcOption
|
||||
) {
|
||||
this._setupMulti(this.mainConnection, 'main')
|
||||
}
|
||||
|
||||
private _setupMulti(
|
||||
connection: MultiSessionConnection,
|
||||
kind: ConnectionKind
|
||||
): void {
|
||||
connection.on('key-change', (idx, key) => {
|
||||
if (kind !== 'main') {
|
||||
// main connection is responsible for authorization,
|
||||
// and keys are then sent to other connections
|
||||
this.manager._log.warn(
|
||||
'got key-change from non-main connection'
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
this.manager._log.debug(
|
||||
'key change for dc %d from connection %d',
|
||||
this.dcId,
|
||||
idx
|
||||
)
|
||||
this.manager._storage.setAuthKeyFor(this.dcId, key)
|
||||
|
||||
// send key to other connections
|
||||
// todo
|
||||
})
|
||||
connection.on('tmp-key-change', (idx, key, expires) => {
|
||||
if (kind !== 'main') {
|
||||
this.manager._log.warn(
|
||||
'got tmp-key-change from non-main connection'
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
this.manager._log.debug(
|
||||
'temp key change for dc %d from connection %d',
|
||||
this.dcId,
|
||||
idx
|
||||
)
|
||||
this.manager._storage.setTempAuthKeyFor(
|
||||
this.dcId,
|
||||
idx,
|
||||
key,
|
||||
expires * 1000
|
||||
)
|
||||
})
|
||||
|
||||
connection.on('auth-begin', () => {
|
||||
// we need to propagate auth-begin to all connections
|
||||
// to avoid them sending requests before auth is complete
|
||||
if (kind !== 'main') {
|
||||
this.manager._log.warn(
|
||||
'got auth-begin from non-main connection'
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// reset key on non-main connections
|
||||
// this event was already propagated to additional main connections
|
||||
// todo
|
||||
})
|
||||
|
||||
connection.on('request-auth', () => {
|
||||
this.mainConnection.requestAuth()
|
||||
})
|
||||
}
|
||||
|
||||
async loadKeys(): Promise<void> {
|
||||
const permanent = await this.manager._storage.getAuthKeyFor(this.dcId)
|
||||
|
||||
await this.mainConnection.setAuthKey(permanent)
|
||||
|
||||
if (this.manager.params.usePfs) {
|
||||
for (let i = 0; i < this.mainConnection._sessions.length; i++) {
|
||||
const temp = await this.manager._storage.getAuthKeyFor(
|
||||
this.dcId,
|
||||
i
|
||||
)
|
||||
await this.mainConnection.setAuthKey(temp, true, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class NetworkManager {
|
||||
|
|
|
@ -66,6 +66,7 @@ export abstract class PersistentConnection extends EventEmitter {
|
|||
super()
|
||||
this.params = params
|
||||
this.changeTransport(params.transportFactory)
|
||||
this._updateLogPrefix()
|
||||
}
|
||||
|
||||
private _updateLogPrefix() {
|
||||
|
|
|
@ -153,26 +153,34 @@ export class SessionConnection extends PersistentConnection {
|
|||
}
|
||||
}
|
||||
|
||||
protected async onConnected(): Promise<void> {
|
||||
onConnected(): void {
|
||||
// check if we have all the needed keys
|
||||
if (!this._session._authKey.ready) {
|
||||
if (!this.params.isMainConnection) {
|
||||
this.log.info('no auth key, waiting for main connection')
|
||||
// once it is done, we will be notified
|
||||
return
|
||||
}
|
||||
|
||||
this.log.info('no perm auth key, authorizing...')
|
||||
this._authorize()
|
||||
// todo: if we use pfs, we can also start temp key exchange here
|
||||
} else if (this.params.usePfs && !this._session._authKeyTemp.ready) {
|
||||
return
|
||||
}
|
||||
|
||||
if (this.params.usePfs && !this._session._authKeyTemp.ready) {
|
||||
this.log.info('no temp auth key but using pfs, authorizing')
|
||||
this._authorizePfs()
|
||||
} else {
|
||||
this.log.info('auth keys are already available')
|
||||
this.onConnectionUsable()
|
||||
return
|
||||
}
|
||||
|
||||
this.log.info('auth keys are already available')
|
||||
this.onConnectionUsable()
|
||||
}
|
||||
|
||||
protected onError(error: Error): void {
|
||||
// https://core.telegram.org/mtproto/mtproto-_transports#_transport-errors
|
||||
if (error instanceof TransportError) {
|
||||
this.log.error('transport error %d', error.code)
|
||||
|
||||
if (error.code === 404) {
|
||||
// if we are using pfs, this could be due to the server
|
||||
// forgetting our temp key (which is kinda weird but expected)
|
||||
|
@ -182,6 +190,8 @@ export class SessionConnection extends PersistentConnection {
|
|||
!this._isPfsBindingPending &&
|
||||
this._session._authKeyTemp.ready
|
||||
) {
|
||||
this.log.info('transport error 404, reauthorizing pfs')
|
||||
|
||||
// this is important! we must reset temp auth key before
|
||||
// we proceed with new temp key derivation.
|
||||
// otherwise, we can end up in an infinite loop in case it
|
||||
|
@ -195,15 +205,18 @@ export class SessionConnection extends PersistentConnection {
|
|||
this._onAllFailed('temp key expired, binding started')
|
||||
return
|
||||
} else if (this._isPfsBindingPending) {
|
||||
this.log.info('transport error 404, pfs binding in progress')
|
||||
|
||||
this._onAllFailed('temp key expired, binding pending')
|
||||
return
|
||||
}
|
||||
|
||||
// otherwise, 404 must be referencing the perm_key
|
||||
this.log.info('transport error 404, reauthorizing')
|
||||
}
|
||||
|
||||
// there happened a little trolling
|
||||
this._session.reset()
|
||||
this._session.reset(true)
|
||||
this.emit('key-change', null)
|
||||
this._authorize()
|
||||
|
||||
|
@ -244,13 +257,30 @@ export class SessionConnection extends PersistentConnection {
|
|||
this._flushTimer.emitBeforeNext(1000)
|
||||
}
|
||||
|
||||
private _authorize(): void {
|
||||
_authorize(): void {
|
||||
if (this._session.authorizationPending) {
|
||||
this.log.info('_authorize(): authorization already in progress')
|
||||
return
|
||||
}
|
||||
|
||||
if (!this.params.isMainConnection) {
|
||||
// we don't authorize on non-main connections
|
||||
this.log.debug('_authorize(): non-main connection, requesting...')
|
||||
this.emit('request-auth')
|
||||
return
|
||||
}
|
||||
|
||||
this._session.authorizationPending = true
|
||||
this.emit('auth-begin')
|
||||
|
||||
doAuthorization(this, this.params.crypto)
|
||||
.then(async ([authKey, serverSalt, timeOffset]) => {
|
||||
await this._session._authKey.setup(authKey)
|
||||
this._session.serverSalt = serverSalt
|
||||
this._session._timeOffset = timeOffset
|
||||
|
||||
this._session.authorizationPending = false
|
||||
|
||||
this.emit('key-change', authKey)
|
||||
|
||||
if (this.params.usePfs) {
|
||||
|
@ -260,6 +290,7 @@ export class SessionConnection extends PersistentConnection {
|
|||
}
|
||||
})
|
||||
.catch((err) => {
|
||||
this._session.authorizationPending = false
|
||||
this.log.error('Authorization error: %s', err.message)
|
||||
this.onError(err)
|
||||
this.reconnect()
|
||||
|
|
Loading…
Reference in a new issue