feat(core)!: future salts storage
breaking: `ITelegramStorage` interface changed
This commit is contained in:
parent
f4ee1f599f
commit
56c2085190
12 changed files with 144 additions and 21 deletions
|
@ -171,6 +171,7 @@ export class MultiSessionConnection extends EventEmitter {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
conn.on('tmp-key-change', (key, expires) => this.emit('tmp-key-change', i, key, expires))
|
conn.on('tmp-key-change', (key, expires) => this.emit('tmp-key-change', i, key, expires))
|
||||||
|
conn.on('future-salts', (salts) => this.emit('future-salts', salts))
|
||||||
conn.on('auth-begin', () => {
|
conn.on('auth-begin', () => {
|
||||||
this._log.debug('received auth-begin from connection %d', i)
|
this._log.debug('received auth-begin from connection %d', i)
|
||||||
this.emit('auth-begin', i)
|
this.emit('auth-begin', i)
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import { tl } from '@mtcute/tl'
|
import { mtp, tl } from '@mtcute/tl'
|
||||||
import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
|
import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
|
||||||
|
|
||||||
import { ITelegramStorage } from '../storage/index.js'
|
import { ITelegramStorage } from '../storage/index.js'
|
||||||
|
@ -302,6 +302,11 @@ export class DcConnectionManager {
|
||||||
})
|
})
|
||||||
.catch((e: Error) => this.manager.params._emitError(e))
|
.catch((e: Error) => this.manager.params._emitError(e))
|
||||||
})
|
})
|
||||||
|
connection.on('future-salts', (salts: mtp.RawMt_future_salt[]) => {
|
||||||
|
Promise.resolve(this.manager._storage.setFutureSalts(this.dcId, salts)).catch((e: Error) =>
|
||||||
|
this.manager.params._emitError(e),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
connection.on('auth-begin', () => {
|
connection.on('auth-begin', () => {
|
||||||
// we need to propagate auth-begin to all connections
|
// we need to propagate auth-begin to all connections
|
||||||
|
@ -354,13 +359,20 @@ export class DcConnectionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
async loadKeys(forcePfs = false): Promise<boolean> {
|
async loadKeys(forcePfs = false): Promise<boolean> {
|
||||||
const permanent = await this.manager._storage.getAuthKeyFor(this.dcId)
|
const [permanent, salts] = await Promise.all([
|
||||||
|
this.manager._storage.getAuthKeyFor(this.dcId),
|
||||||
|
this.manager._storage.getFutureSalts(this.dcId),
|
||||||
|
])
|
||||||
|
|
||||||
this.main.setAuthKey(permanent)
|
this.main.setAuthKey(permanent)
|
||||||
this.upload.setAuthKey(permanent)
|
this.upload.setAuthKey(permanent)
|
||||||
this.download.setAuthKey(permanent)
|
this.download.setAuthKey(permanent)
|
||||||
this.downloadSmall.setAuthKey(permanent)
|
this.downloadSmall.setAuthKey(permanent)
|
||||||
|
|
||||||
|
if (salts) {
|
||||||
|
this._salts.setFutureSalts(salts)
|
||||||
|
}
|
||||||
|
|
||||||
if (!permanent) {
|
if (!permanent) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,8 @@
|
||||||
import EventEmitter from 'events'
|
|
||||||
import Long from 'long'
|
import Long from 'long'
|
||||||
|
|
||||||
import { mtp } from '@mtcute/tl'
|
import { mtp } from '@mtcute/tl'
|
||||||
|
|
||||||
export class ServerSaltManager extends EventEmitter {
|
export class ServerSaltManager {
|
||||||
private _futureSalts: mtp.RawMt_future_salt[] = []
|
private _futureSalts: mtp.RawMt_future_salt[] = []
|
||||||
|
|
||||||
currentSalt = Long.ZERO
|
currentSalt = Long.ZERO
|
||||||
|
@ -17,12 +16,15 @@ export class ServerSaltManager extends EventEmitter {
|
||||||
setFutureSalts(salts: mtp.RawMt_future_salt[]): void {
|
setFutureSalts(salts: mtp.RawMt_future_salt[]): void {
|
||||||
this._futureSalts = salts
|
this._futureSalts = salts
|
||||||
|
|
||||||
if (Date.now() > salts[0].validSince * 1000) {
|
const now = Date.now() / 1000
|
||||||
|
|
||||||
|
while (salts.length > 0 && now > salts[0].validSince) {
|
||||||
this.currentSalt = salts[0].salt
|
this.currentSalt = salts[0].salt
|
||||||
this._futureSalts.shift()
|
this._futureSalts.shift()
|
||||||
}
|
}
|
||||||
|
|
||||||
this._scheduleNext()
|
if (!this._futureSalts.length) this.currentSalt = Long.ZERO
|
||||||
|
else this._scheduleNext()
|
||||||
}
|
}
|
||||||
|
|
||||||
private _timer?: NodeJS.Timeout
|
private _timer?: NodeJS.Timeout
|
||||||
|
|
|
@ -1257,8 +1257,11 @@ export class SessionConnection extends PersistentConnection {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.log.debug('received future_salts: %d salts', msg.salts.length)
|
||||||
|
|
||||||
this._salts.isFetching = false
|
this._salts.isFetching = false
|
||||||
this._salts.setFutureSalts(msg.salts)
|
this._salts.setFutureSalts(msg.salts.slice())
|
||||||
|
this.emit('future-salts', msg.salts)
|
||||||
}
|
}
|
||||||
|
|
||||||
private _onDestroySessionResult(msg: mtp.TypeDestroySessionRes): void {
|
private _onDestroySessionResult(msg: mtp.TypeDestroySessionRes): void {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import { tl } from '@mtcute/tl'
|
import { mtp, tl } from '@mtcute/tl'
|
||||||
import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
|
import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
|
||||||
|
|
||||||
import { BasicPeerType, MaybeAsync } from '../types/index.js'
|
import { BasicPeerType, MaybeAsync } from '../types/index.js'
|
||||||
|
@ -94,6 +94,18 @@ export interface ITelegramStorage {
|
||||||
*/
|
*/
|
||||||
getDefaultDcs(): MaybeAsync<ITelegramStorage.DcOptions | null>
|
getDefaultDcs(): MaybeAsync<ITelegramStorage.DcOptions | null>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store information about future salts for a given DC
|
||||||
|
*/
|
||||||
|
setFutureSalts(dcId: number, salts: mtp.RawMt_future_salt[]): MaybeAsync<void>
|
||||||
|
/**
|
||||||
|
* Get information about future salts for a given DC (if available)
|
||||||
|
*
|
||||||
|
* You don't need to implement any checks, they will be done by the library.
|
||||||
|
* It is enough to just return the same array that was passed to `setFutureSalts`.
|
||||||
|
*/
|
||||||
|
getFutureSalts(dcId: number): MaybeAsync<mtp.RawMt_future_salt[] | null>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get auth_key for a given DC
|
* Get auth_key for a given DC
|
||||||
* (returning null will start authorization)
|
* (returning null will start authorization)
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
||||||
import { tl } from '@mtcute/tl'
|
import { mtp, tl } from '@mtcute/tl'
|
||||||
import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
|
import { TlBinaryReader, TlBinaryWriter, TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
|
||||||
|
|
||||||
import { Logger } from '../utils/logger.js'
|
import { Logger } from '../utils/logger.js'
|
||||||
|
@ -328,6 +328,30 @@ export class IdbStorage implements ITelegramStorage {
|
||||||
return this._getFromKv('dcs')
|
return this._getFromKv('dcs')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getFutureSalts(dcId: number): Promise<mtp.RawMt_future_salt[] | null> {
|
||||||
|
const res = await this._getFromKv<string[]>(`futureSalts:${dcId}`)
|
||||||
|
if (!res) return null
|
||||||
|
|
||||||
|
return res.map((it) => {
|
||||||
|
const [salt, validSince, validUntil] = it.split(',')
|
||||||
|
|
||||||
|
return {
|
||||||
|
_: 'mt_future_salt',
|
||||||
|
validSince: Number(validSince),
|
||||||
|
validUntil: Number(validUntil),
|
||||||
|
salt: longFromFastString(salt),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
setFutureSalts(dcId: number, salts: mtp.RawMt_future_salt[]): Promise<void> {
|
||||||
|
return this._setToKv(
|
||||||
|
`futureSalts:${dcId}`,
|
||||||
|
salts.map((salt) => `${longToFastString(salt.salt)},${salt.validSince},${salt.validUntil}`),
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
async getAuthKeyFor(dcId: number, tempIndex?: number | undefined): Promise<Uint8Array | null> {
|
async getAuthKeyFor(dcId: number, tempIndex?: number | undefined): Promise<Uint8Array | null> {
|
||||||
let row: AuthKeyDto
|
let row: AuthKeyDto
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ export class JsonMemoryStorage extends MemoryStorage {
|
||||||
}
|
}
|
||||||
case 'authKeysTempExpiry':
|
case 'authKeysTempExpiry':
|
||||||
case 'pts':
|
case 'pts':
|
||||||
|
case 'futureSalts':
|
||||||
return new Map(Object.entries(value as Record<string, string>))
|
return new Map(Object.entries(value as Record<string, string>))
|
||||||
case 'phoneIndex':
|
case 'phoneIndex':
|
||||||
case 'usernameIndex':
|
case 'usernameIndex':
|
||||||
|
|
|
@ -14,7 +14,7 @@ describe('MemoryStorage', () => {
|
||||||
constructor() {
|
constructor() {
|
||||||
super()
|
super()
|
||||||
this._setStateFrom({
|
this._setStateFrom({
|
||||||
$version: 2,
|
$version: 3,
|
||||||
defaultDcs: null,
|
defaultDcs: null,
|
||||||
authKeys: new Map(),
|
authKeys: new Map(),
|
||||||
authKeysTemp: new Map(),
|
authKeysTemp: new Map(),
|
||||||
|
@ -28,6 +28,7 @@ describe('MemoryStorage', () => {
|
||||||
rl: new Map(),
|
rl: new Map(),
|
||||||
refs: new Map(),
|
refs: new Map(),
|
||||||
self: null,
|
self: null,
|
||||||
|
futureSalts: new Map(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
import { tl } from '@mtcute/tl'
|
import { mtp, tl } from '@mtcute/tl'
|
||||||
|
|
||||||
import { LruMap, toggleChannelIdMark } from '../utils/index.js'
|
import { LruMap, toggleChannelIdMark } from '../utils/index.js'
|
||||||
import { ITelegramStorage } from './abstract.js'
|
import { ITelegramStorage } from './abstract.js'
|
||||||
|
|
||||||
const CURRENT_VERSION = 2
|
const CURRENT_VERSION = 3
|
||||||
|
|
||||||
type PeerInfoWithUpdated = ITelegramStorage.PeerInfo & { updated: number }
|
type PeerInfoWithUpdated = ITelegramStorage.PeerInfo & { updated: number }
|
||||||
|
|
||||||
|
@ -54,6 +54,7 @@ export interface MemorySessionState {
|
||||||
>
|
>
|
||||||
|
|
||||||
self: ITelegramStorage.SelfInfo | null
|
self: ITelegramStorage.SelfInfo | null
|
||||||
|
futureSalts: Map<number, mtp.RawMt_future_salt[]>
|
||||||
}
|
}
|
||||||
|
|
||||||
const USERNAME_TTL = 86400000 // 24 hours
|
const USERNAME_TTL = 86400000 // 24 hours
|
||||||
|
@ -126,6 +127,7 @@ export class MemoryStorage implements ITelegramStorage {
|
||||||
fsm: new Map(),
|
fsm: new Map(),
|
||||||
rl: new Map(),
|
rl: new Map(),
|
||||||
self: null,
|
self: null,
|
||||||
|
futureSalts: new Map(),
|
||||||
}
|
}
|
||||||
this._cachedInputPeers?.clear()
|
this._cachedInputPeers?.clear()
|
||||||
this._cachedFull?.clear()
|
this._cachedFull?.clear()
|
||||||
|
@ -143,7 +145,12 @@ export class MemoryStorage implements ITelegramStorage {
|
||||||
if (ver === 1) {
|
if (ver === 1) {
|
||||||
// v2: introduced message references
|
// v2: introduced message references
|
||||||
obj.refs = new Map()
|
obj.refs = new Map()
|
||||||
obj.$version = ver = 2
|
obj.$version = ver = 2 as any // eslint-disable-line
|
||||||
|
}
|
||||||
|
if (ver === 2) {
|
||||||
|
// v3: introduced future salts
|
||||||
|
obj.futureSalts = new Map()
|
||||||
|
obj.$version = ver = 3
|
||||||
}
|
}
|
||||||
if (ver !== CURRENT_VERSION) return
|
if (ver !== CURRENT_VERSION) return
|
||||||
|
|
||||||
|
@ -203,6 +210,14 @@ export class MemoryStorage implements ITelegramStorage {
|
||||||
this._state.defaultDcs = dcs
|
this._state.defaultDcs = dcs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setFutureSalts(dcId: number, salts: mtp.RawMt_future_salt[]): void {
|
||||||
|
this._state.futureSalts.set(dcId, salts)
|
||||||
|
}
|
||||||
|
|
||||||
|
getFutureSalts(dcId: number): mtp.RawMt_future_salt[] | null {
|
||||||
|
return this._state.futureSalts.get(dcId) ?? null
|
||||||
|
}
|
||||||
|
|
||||||
setTempAuthKeyFor(dcId: number, index: number, key: Uint8Array | null, expiresAt: number): void {
|
setTempAuthKeyFor(dcId: number, index: number, key: Uint8Array | null, expiresAt: number): void {
|
||||||
const k = `${dcId}:${index}`
|
const k = `${dcId}:${index}`
|
||||||
|
|
||||||
|
@ -246,6 +261,9 @@ export class MemoryStorage implements ITelegramStorage {
|
||||||
this._state.authKeysTempExpiry.delete(key)
|
this._state.authKeysTempExpiry.delete(key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// future salts are linked to auth keys
|
||||||
|
this._state.futureSalts.delete(dcId)
|
||||||
}
|
}
|
||||||
|
|
||||||
updatePeers(peers: PeerInfoWithUpdated[]): void {
|
updatePeers(peers: PeerInfoWithUpdated[]): void {
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
import sqlite3, { Options } from 'better-sqlite3'
|
import sqlite3, { Options } from 'better-sqlite3'
|
||||||
|
|
||||||
import { ITelegramStorage, tl, toggleChannelIdMark } from '@mtcute/core'
|
import { ITelegramStorage, mtp, tl, toggleChannelIdMark } from '@mtcute/core'
|
||||||
import {
|
import {
|
||||||
Logger,
|
Logger,
|
||||||
longFromFastString,
|
longFromFastString,
|
||||||
|
@ -528,6 +528,29 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage*/ {
|
||||||
return this._getFromKv('def_dc')
|
return this._getFromKv('def_dc')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getFutureSalts(dcId: number): mtp.RawMt_future_salt[] | null {
|
||||||
|
return (
|
||||||
|
this._getFromKv<string[]>(`futureSalts:${dcId}`)?.map((it) => {
|
||||||
|
const [salt, validSince, validUntil] = it.split(',')
|
||||||
|
|
||||||
|
return {
|
||||||
|
_: 'mt_future_salt',
|
||||||
|
validSince: Number(validSince),
|
||||||
|
validUntil: Number(validUntil),
|
||||||
|
salt: longFromFastString(salt),
|
||||||
|
}
|
||||||
|
}) ?? null
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
setFutureSalts(dcId: number, salts: mtp.RawMt_future_salt[]): void {
|
||||||
|
return this._setToKv(
|
||||||
|
`futureSalts:${dcId}`,
|
||||||
|
salts.map((salt) => `${longToFastString(salt.salt)},${salt.validSince},${salt.validUntil}`),
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
getAuthKeyFor(dcId: number, tempIndex?: number): Uint8Array | null {
|
getAuthKeyFor(dcId: number, tempIndex?: number): Uint8Array | null {
|
||||||
let row
|
let row
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vites
|
||||||
|
|
||||||
import { ITelegramStorage, MaybeAsync } from '@mtcute/core'
|
import { ITelegramStorage, MaybeAsync } from '@mtcute/core'
|
||||||
import { defaultProductionDc, hexEncode, Logger, LogManager, TlReaderMap, TlWriterMap } from '@mtcute/core/utils.js'
|
import { defaultProductionDc, hexEncode, Logger, LogManager, TlReaderMap, TlWriterMap } from '@mtcute/core/utils.js'
|
||||||
import { tl } from '@mtcute/tl'
|
import { mtp, tl } from '@mtcute/tl'
|
||||||
import { __tlReaderMap } from '@mtcute/tl/binary/reader.js'
|
import { __tlReaderMap } from '@mtcute/tl/binary/reader.js'
|
||||||
import { __tlWriterMap } from '@mtcute/tl/binary/writer.js'
|
import { __tlWriterMap } from '@mtcute/tl/binary/writer.js'
|
||||||
|
|
||||||
|
@ -176,6 +176,30 @@ export function testStorage<T extends ITelegramStorage>(
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
describe('future salts', () => {
|
||||||
|
const someFutureSalt1 = Long.fromBits(123, 456)
|
||||||
|
const someFutureSalt2 = Long.fromBits(789, 101112)
|
||||||
|
const someFutureSalt3 = Long.fromBits(131415, 161718)
|
||||||
|
const someFutureSalt4 = Long.fromBits(192021, 222324)
|
||||||
|
|
||||||
|
const salts1: mtp.RawMt_future_salt[] = [
|
||||||
|
{ _: 'mt_future_salt', validSince: 123, validUntil: 456, salt: someFutureSalt1 },
|
||||||
|
{ _: 'mt_future_salt', validSince: 789, validUntil: 101112, salt: someFutureSalt2 },
|
||||||
|
]
|
||||||
|
const salts2: mtp.RawMt_future_salt[] = [
|
||||||
|
{ _: 'mt_future_salt', validSince: 123, validUntil: 456, salt: someFutureSalt3 },
|
||||||
|
{ _: 'mt_future_salt', validSince: 789, validUntil: 101112, salt: someFutureSalt4 },
|
||||||
|
]
|
||||||
|
|
||||||
|
it('should store and retrieve future salts', async () => {
|
||||||
|
await s.setFutureSalts(1, salts1)
|
||||||
|
await s.setFutureSalts(2, salts2)
|
||||||
|
|
||||||
|
expect(await s.getFutureSalts(1)).toEqual(salts1)
|
||||||
|
expect(await s.getFutureSalts(2)).toEqual(salts2)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
describe('peers', () => {
|
describe('peers', () => {
|
||||||
it('should cache and return peers', async () => {
|
it('should cache and return peers', async () => {
|
||||||
await s.updatePeers([stubPeerUser, peerChannel])
|
await s.updatePeers([stubPeerUser, peerChannel])
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import { describe, expect, it } from 'vitest'
|
import { describe, expect, it, vi } from 'vitest'
|
||||||
|
|
||||||
import { BaseTelegramClient } from '@mtcute/core'
|
import { BaseTelegramClient } from '@mtcute/core'
|
||||||
import { MemoryStorage } from '@mtcute/core/src/storage/memory.js'
|
import { MemoryStorage } from '@mtcute/core/src/storage/memory.js'
|
||||||
|
@ -31,11 +31,13 @@ describe('transport stub', () => {
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
|
|
||||||
await client.connect().catch(() => {}) // ignore "client closed" error
|
client.connect().catch(() => {}) // ignore "client closed" error
|
||||||
|
|
||||||
expect(log).toEqual([
|
await vi.waitFor(() =>
|
||||||
'message size=40', // req_pq_multi
|
expect(log).toEqual([
|
||||||
'connect 1.2.3.4:1234 test=false',
|
'message size=40', // req_pq_multi
|
||||||
])
|
'connect 1.2.3.4:1234 test=false',
|
||||||
|
]),
|
||||||
|
)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue