fix: client and updates fixes

This commit is contained in:
alina 🌸 2023-08-12 22:40:37 +03:00
parent 2b1bac053e
commit 85c43d804d
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
15 changed files with 121 additions and 191 deletions

View file

@ -38,19 +38,19 @@ export async function checkPassword(
'user',
)
this.log.prefix = `[USER ${this._userId}] `
this._userId = res.user.id
this.log.prefix = `[USER ${this._userId}] `
this._isBot = false
this._selfChanged = true
this._selfUsername = res.user.username ?? null
await this.network.notifyLoggedIn(res)
await this._fetchUpdatesState()
await this._saveStorage()
// telegram ignores invokeWithoutUpdates for auth methods
// todo where is this._disableUpdates?
// if (this._disableUpdates) this.primaryConnection._resetSession()
// else
this.startUpdatesLoop()
if (this.network.params.disableUpdates) this.network.resetSessions()
else this.startUpdatesLoop()
return new User(this, res.user)
}

View file

@ -33,19 +33,20 @@ export async function signInBot(
'user',
)
this.log.prefix = `[USER ${this._userId}] `
this._userId = res.user.id
this.log.prefix = `[USER ${this._userId}] `
this._isBot = true
this._selfUsername = res.user.username!
this._selfChanged = true
await this.network.notifyLoggedIn(res)
await this._fetchUpdatesState()
await this._saveStorage()
// telegram ignores invokeWithoutUpdates for auth methods
// todo where is this._disableUpdates?
// if (this._disableUpdates) this.primaryConnection._resetSession()
// else
this.startUpdatesLoop()
if (this.network.params.disableUpdates) this.network.resetSessions()
else this.startUpdatesLoop()
return new User(this, res.user)
}

View file

@ -41,19 +41,19 @@ export async function signIn(
assertTypeIs('signIn (@ auth.signIn -> user)', res.user, 'user')
this.log.prefix = `[USER ${this._userId}] `
this._userId = res.user.id
this.log.prefix = `[USER ${this._userId}] `
this._isBot = false
this._selfChanged = true
this._selfUsername = res.user.username ?? null
await this.network.notifyLoggedIn(res)
await this._fetchUpdatesState()
await this._saveStorage()
// telegram ignores invokeWithoutUpdates for auth methods
// todo where is this._disableUpdates?
// if (this._disableUpdates) this.primaryConnection._resetSession()
// else
this.startUpdatesLoop()
if (this.network.params.disableUpdates) this.network.resetSessions()
else this.startUpdatesLoop()
return new User(this, res.user)
}

View file

@ -32,18 +32,19 @@ export async function signUp(
assertTypeIs('signUp (@ auth.signUp)', res, 'auth.authorization')
assertTypeIs('signUp (@ auth.signUp -> user)', res.user, 'user')
this.log.prefix = `[USER ${this._userId}] `
this._userId = res.user.id
this.log.prefix = `[USER ${this._userId}] `
this._isBot = false
this._selfChanged = true
await this.network.notifyLoggedIn(res)
await this._fetchUpdatesState()
await this._saveStorage()
// telegram ignores invokeWithoutUpdates for auth methods
// todo where is this._disableUpdates?
// if (this._disableUpdates) this.primaryConnection._resetSession()
// else
this.startUpdatesLoop()
if (this.network.params.disableUpdates) this.network.resetSessions()
else this.startUpdatesLoop()
return new User(this, res.user)
}

View file

@ -227,8 +227,6 @@ export class BaseTelegramClient extends EventEmitter {
this.call({ _: 'help.getConfig' }),
)
private _additionalConnections: SessionConnection[] = []
// not really connected, but rather "connect() was called"
private _connected: ControllablePromise<void> | boolean = false
@ -335,6 +333,9 @@ export class BaseTelegramClient extends EventEmitter {
return
}
// we cant do this in constructor because we need to support subclassing
this.network.setUpdateHandler(this._handleUpdate.bind(this))
const promise = (this._connected = createControllablePromise())
await this._loadStorage()
@ -406,36 +407,10 @@ export class BaseTelegramClient extends EventEmitter {
this._config.destroy()
this.network.destroy()
// close additional connections
this._additionalConnections.forEach((conn) => conn.destroy())
await this._saveStorage()
await this.storage.destroy?.()
}
/**
* Change primary DC and write that fact to the storage.
* Will immediately reconnect to another DC.
*
* @param newDc New DC or its ID
*/
async changeDc(newDc: tl.RawDcOption | number): Promise<void> {
if (typeof newDc === 'number') {
const res = await this._config.findOption({
dcId: newDc,
allowIpv6: this._useIpv6,
})
if (!res) throw new Error('DC not found')
newDc = res
}
this._defaultDc = newDc
await this.storage.setDefaultDc(newDc)
await this._saveStorage()
// todo
// await this.primaryConnection.changeDc(newDc)
}
/**
* Make an RPC call to the primary DC.
* This method handles DC migration, flood waits and retries automatically.
@ -465,118 +440,6 @@ export class BaseTelegramClient extends EventEmitter {
return res
}
// /**
// * Creates an additional connection to a given DC.
// * This will use auth key for that DC that was already stored
// * in the session, or generate a new auth key by exporting
// * authorization from primary DC and importing it to the new DC.
// * New connection will use the same crypto provider, `initConnection`,
// * transport and reconnection strategy as the primary connection
// *
// * This method is quite low-level and you shouldn't usually care about this
// * when using high-level API provided by `@mtcute/client`.
// *
// * @param dcId DC id, to which the connection will be created
// * @param cdn Whether that DC is a CDN DC
// * @param inactivityTimeout
// * Inactivity timeout for the connection (in ms), after which the transport will be closed.
// * Note that connection can still be used normally, it's just the transport which is closed.
// * Defaults to 5 min
// */
// async createAdditionalConnection(
// dcId: number,
// params?: {
// // todo proper docs
// // default = false
// media?: boolean
// // default = fa;se
// cdn?: boolean
// // default = 300_000
// inactivityTimeout?: number
// // default = false
// disableUpdates?: boolean
// }
// ): Promise<SessionConnection> {
// const dc = await this._config.findOption({
// dcId,
// preferMedia: params?.media,
// cdn: params?.cdn,
// allowIpv6: this._useIpv6,
// })
// if (!dc) throw new Error('DC not found')
// const connection = new SessionConnection(
// {
// dc,
// testMode: this._testMode,
// crypto: this._crypto,
// initConnection: this._initConnectionParams,
// transportFactory: this._transportFactory,
// reconnectionStrategy: this._reconnectionStrategy,
// inactivityTimeout: params?.inactivityTimeout ?? 300_000,
// layer: this._layer,
// disableUpdates: params?.disableUpdates,
// readerMap: this._readerMap,
// writerMap: this._writerMap,
// },
// this.log.create('connection')
// )
//
// connection.on('error', (err) => this._emitError(err, connection))
// await connection.setupKeys(await this.storage.getAuthKeyFor(dc.id))
// connection.connect()
//
// if (!connection.getAuthKey()) {
// this.log.info('exporting auth to DC %d', dcId)
// const auth = await this.call({
// _: 'auth.exportAuthorization',
// dcId,
// })
// await connection.sendRpc({
// _: 'auth.importAuthorization',
// id: auth.id,
// bytes: auth.bytes,
// })
//
// // connection.authKey was already generated at this point
// this.storage.setAuthKeyFor(dc.id, connection.getAuthKey()!)
// await this._saveStorage()
// } else {
// // in case the auth key is invalid
// const dcId = dc.id
// connection.on('key-change', async (key) => {
// // we don't need to export, it will be done by `.call()`
// // in case this error is returned
// //
// // even worse, exporting here will lead to a race condition,
// // and may result in redundant re-exports.
//
// this.storage.setAuthKeyFor(dcId, key)
// await this._saveStorage()
// })
// }
//
// this._additionalConnections.push(connection)
//
// return connection
// }
/**
* Destroy a connection that was previously created using
* {@link BaseTelegramClient.createAdditionalConnection}.
* Passing any other connection will not have any effect.
*
* @param connection Connection created with {@link BaseTelegramClient.createAdditionalConnection}
*/
async destroyAdditionalConnection(
connection: SessionConnection,
): Promise<void> {
const idx = this._additionalConnections.indexOf(connection)
if (idx === -1) return
await connection.destroy()
this._additionalConnections.splice(idx, 1)
}
/**
* Change transport for the client.
*
@ -589,11 +452,7 @@ export class BaseTelegramClient extends EventEmitter {
*/
changeTransport(factory: TransportFactory): void {
// todo
// this.primaryConnection.changeTransport(factory)
this._additionalConnections.forEach((conn) =>
conn.changeTransport(factory),
)
this.network.changeTransport(factory)
}
/**

View file

@ -64,6 +64,7 @@ export class ConfigManager {
dcId: number
allowIpv6?: boolean
preferIpv6?: boolean
allowMedia?: boolean
preferMedia?: boolean
cdn?: boolean
}): Promise<tl.RawDcOption | undefined> {
@ -72,6 +73,7 @@ export class ConfigManager {
const options = this._config!.dcOptions.filter((opt) => {
if (opt.tcpoOnly) return false // unsupported
if (opt.ipv6 && !params.allowIpv6) return false
if (opt.mediaOnly && !params.allowMedia) return false
if (opt.cdn && !params.cdn) return false
return opt.id === params.dcId

View file

@ -8,6 +8,7 @@ import {
SessionConnection,
SessionConnectionParams,
} from './session-connection'
import { TransportFactory } from './transports'
export class MultiSessionConnection extends EventEmitter {
private _log: Logger
@ -293,4 +294,18 @@ export class MultiSessionConnection extends EventEmitter {
requestAuth(): void {
this._connections[0]._authorize()
}
resetSessions(): void {
if (this.params.isMainConnection) {
for (const conn of this._connections) {
conn._resetSession()
}
} else {
this._connections[0]._resetSession()
}
}
changeTransport(factory: TransportFactory): void {
this._connections.forEach((conn) => conn.changeTransport(factory))
}
}

View file

@ -500,7 +500,7 @@ export class NetworkManager {
dcId,
allowIpv6: this.params.useIpv6,
preferIpv6: this.params.useIpv6,
preferMedia: dcId !== this._primaryDc?.dcId,
allowMedia: false,
cdn: false,
})
@ -541,14 +541,6 @@ export class NetworkManager {
dcId: manager.dcId,
})
// manager.ensureMainConnection()
//
// if (!manager.main._sessions[0]._authKey.ready) {
// await manager.loadKeys()
// }
//
// manager.main.ensureConnected()
const res = await this.call(
{
_: 'auth.importAuthorization',
@ -584,13 +576,26 @@ export class NetworkManager {
}
async notifyLoggedIn(auth: tl.auth.TypeAuthorization): Promise<void> {
if (auth._ === 'auth.authorizationSignUpRequired') return
if (
auth._ === 'auth.authorizationSignUpRequired' ||
auth.user._ === 'userEmpty'
) { return }
if (auth.tmpSessions) {
this._primaryDc?.main.setCount(auth.tmpSessions)
}
// await this.exportAuth()
await this.exportAuth()
}
resetSessions(): void {
const dc = this._primaryDc
if (!dc) return
dc.main.resetSessions()
dc.upload.resetSessions()
dc.download.resetSessions()
dc.downloadSmall.resetSessions()
}
private _onConfigChanged(config: tl.RawConfig): void {
@ -607,6 +612,7 @@ export class NetworkManager {
allowIpv6: this.params.useIpv6,
preferIpv6: this.params.useIpv6,
cdn: false,
allowMedia: false,
})
if (!option) {
@ -750,6 +756,19 @@ export class NetworkManager {
throw lastError
}
setUpdateHandler(handler: NetworkManager['_updateHandler']): void {
this._updateHandler = handler
}
changeTransport(factory: TransportFactory): void {
Object.values(this._dcConnections).forEach((dc) => {
dc.main.changeTransport(factory)
dc.upload.changeTransport(factory)
dc.download.changeTransport(factory)
dc.downloadSmall.changeTransport(factory)
})
}
destroy(): void {
for (const dc of Object.values(this._dcConnections)) {
dc.main.destroy()

View file

@ -30,7 +30,7 @@ export abstract class PersistentConnection extends EventEmitter {
private _uid = nextConnectionUid++
readonly params: PersistentConnectionParams
private _transport!: ITelegramTransport
protected _transport!: ITelegramTransport
private _sendOnceConnected: Buffer[] = []

View file

@ -261,7 +261,7 @@ export class SessionConnection extends PersistentConnection {
}
}
this.emit('error', error)
this.emit('api-error', error)
}
protected onConnectionUsable() {
@ -271,7 +271,10 @@ export class SessionConnection extends PersistentConnection {
// we must send some user-related rpc to the server to make sure that
// it will send us updates
this.sendRpc({ _: 'updates.getState' }).catch((err: any) => {
this.log.warn('failed to send updates.getState: %s', err)
this.log.warn(
'failed to send updates.getState: %s',
err.text || err.message,
)
})
}
@ -1453,11 +1456,13 @@ export class SessionConnection extends PersistentConnection {
method,
this.params.layer,
)
const proxy = this._transport.getMtproxyInfo?.()
obj = {
_: 'invokeWithLayer',
layer: this.params.layer,
query: {
...this.params.initConnection,
proxy,
query: obj,
},
}

View file

@ -58,6 +58,8 @@ export interface ITelegramTransport extends EventEmitter {
* This method is called before any other.
*/
setup?(crypto: ICryptoProvider, log: Logger): void
getMtproxyInfo?(): tl.RawInputClientProxy
}
/** Transport factory function */

View file

@ -91,10 +91,20 @@ export class MtProxyTcpTransport extends BaseTcpTransport {
}
}
getMtproxyInfo(): tl.RawInputClientProxy {
return {
_: 'inputClientProxy',
address: this._proxy.host,
port: this._proxy.port,
}
}
_packetCodec!: IPacketCodec
connect(dc: tl.RawDcOption, testMode: boolean): void {
if (this._state !== TransportState.Idle) { throw new Error('Transport is not IDLE') }
if (this._state !== TransportState.Idle) {
throw new Error('Transport is not IDLE')
}
if (this._packetCodec && this._currentDc?.id !== dc.id) {
// dc changed, thus the codec's init will change too

View file

@ -12,6 +12,6 @@
},
"dependencies": {
"@mtcute/core": "workspace:^1.0.0",
"ip6": "0.2.10"
"ip6": "0.2.7"
}
}

View file

@ -401,7 +401,8 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
private _initializeStatements(): void {
this._statements = {} as unknown as typeof this._statements
Object.entries(STATEMENTS).forEach(([name, sql]) => {
this._statements[name as keyof typeof this._statements] = this._db.prepare(sql)
this._statements[name as keyof typeof this._statements] =
this._db.prepare(sql)
})
}
@ -417,7 +418,7 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
const versionResult = this._db
.prepare("select value from kv where key = 'ver'")
.get()
const version = (versionResult as { value: number }).value
const version = Number((versionResult as { value: number }).value)
this.log.debug('current db version = %d', version)
@ -446,7 +447,10 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
load(): void {
this._db = sqlite3(this._filename, {
verbose: this.log.mgr.level === 5 ? this.log.verbose as Options['verbose'] : undefined,
verbose:
this.log.mgr.level === 5 ?
(this.log.verbose as Options['verbose']) :
undefined,
})
this._initialize()
@ -648,7 +652,9 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
const cached = this._cache?.get(peerId)
if (cached) return cached.peer
const row = this._statements.getEntById.get(peerId) as SqliteEntity | null
const row = this._statements.getEntById.get(
peerId,
) as SqliteEntity | null
if (row) {
const peer = getInputPeer(row)
@ -664,7 +670,9 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
}
getPeerByPhone(phone: string): tl.TypeInputPeer | null {
const row = this._statements.getEntByPhone.get(phone) as SqliteEntity | null
const row = this._statements.getEntByPhone.get(
phone,
) as SqliteEntity | null
if (row) {
const peer = getInputPeer(row)
@ -680,7 +688,9 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage {
}
getPeerByUsername(username: string): tl.TypeInputPeer | null {
const row = this._statements.getEntByUser.get(username.toLowerCase()) as SqliteEntity | null
const row = this._statements.getEntByUser.get(
username.toLowerCase(),
) as SqliteEntity | null
if (!row || Date.now() - row.updated > USERNAME_TTL) return null
if (row) {

View file

@ -1,5 +1,9 @@
lockfileVersion: '6.0'
settings:
autoInstallPeers: true
excludeLinksFromLockfile: false
importers:
.:
@ -272,8 +276,8 @@ importers:
specifier: workspace:^1.0.0
version: link:../core
ip6:
specifier: 0.2.10
version: 0.2.10
specifier: 0.2.7
version: 0.2.7
packages/sqlite:
dependencies:
@ -3012,6 +3016,7 @@ packages:
/iconv-lite@0.6.3:
resolution: {integrity: sha512-4fCk79wshMdzMp2rH06qWrJE4iolqLhCUH+OiuIgU++RB0+94NlDL81atO7GX55uUKueo0txHNtvEyI6D7WdMw==}
engines: {node: '>=0.10.0'}
requiresBuild: true
dependencies:
safer-buffer: 2.1.2
dev: false
@ -3067,8 +3072,8 @@ packages:
side-channel: 1.0.4
dev: true
/ip6@0.2.10:
resolution: {integrity: sha512-1LdpyKjhvepd6EbAU6rW4g14vuYtx5TnJX9TfZZBhsM6DsyPQLNzW12rtbUqXBMwqFrLVV/Gcxv0GNFvJp2cYA==}
/ip6@0.2.7:
resolution: {integrity: sha512-zEzGsxn4Uw33TByv0DdX/RRh+VsGfEctOp7CvJq/b4JEjY9OvPB58dsMYiEwIVLsIWHZSJPn3XG8mP9Qv3TG3g==}
hasBin: true
dev: false
@ -4580,6 +4585,7 @@ packages:
/safer-buffer@2.1.2:
resolution: {integrity: sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==}
requiresBuild: true
dev: false
optional: true