From 85c43d804d71c99965225f7088927e6b7c261d15 Mon Sep 17 00:00:00 2001 From: Alina Sireneva Date: Sat, 12 Aug 2023 22:40:37 +0300 Subject: [PATCH] fix: client and updates fixes --- .../client/src/methods/auth/check-password.ts | 10 +- .../client/src/methods/auth/sign-in-bot.ts | 11 +- packages/client/src/methods/auth/sign-in.ts | 10 +- packages/client/src/methods/auth/sign-up.ts | 11 +- packages/core/src/base-client.ts | 149 +----------------- packages/core/src/network/config-manager.ts | 2 + .../src/network/multi-session-connection.ts | 15 ++ packages/core/src/network/network-manager.ts | 41 +++-- .../core/src/network/persistent-connection.ts | 2 +- .../core/src/network/session-connection.ts | 9 +- .../core/src/network/transports/abstract.ts | 2 + packages/mtproxy/index.ts | 12 +- packages/socks-proxy/package.json | 2 +- packages/sqlite/index.ts | 22 ++- pnpm-lock.yaml | 14 +- 15 files changed, 121 insertions(+), 191 deletions(-) diff --git a/packages/client/src/methods/auth/check-password.ts b/packages/client/src/methods/auth/check-password.ts index 08a1f9d6..4c0aeb1a 100644 --- a/packages/client/src/methods/auth/check-password.ts +++ b/packages/client/src/methods/auth/check-password.ts @@ -38,19 +38,19 @@ export async function checkPassword( 'user', ) - this.log.prefix = `[USER ${this._userId}] ` this._userId = res.user.id + this.log.prefix = `[USER ${this._userId}] ` this._isBot = false this._selfChanged = true this._selfUsername = res.user.username ?? null + await this.network.notifyLoggedIn(res) + await this._fetchUpdatesState() await this._saveStorage() // telegram ignores invokeWithoutUpdates for auth methods - // todo where is this._disableUpdates? - // if (this._disableUpdates) this.primaryConnection._resetSession() - // else - this.startUpdatesLoop() + if (this.network.params.disableUpdates) this.network.resetSessions() + else this.startUpdatesLoop() return new User(this, res.user) } diff --git a/packages/client/src/methods/auth/sign-in-bot.ts b/packages/client/src/methods/auth/sign-in-bot.ts index d78a5d07..a7a49ea2 100644 --- a/packages/client/src/methods/auth/sign-in-bot.ts +++ b/packages/client/src/methods/auth/sign-in-bot.ts @@ -33,19 +33,20 @@ export async function signInBot( 'user', ) - this.log.prefix = `[USER ${this._userId}] ` this._userId = res.user.id + this.log.prefix = `[USER ${this._userId}] ` this._isBot = true this._selfUsername = res.user.username! this._selfChanged = true + + await this.network.notifyLoggedIn(res) + await this._fetchUpdatesState() await this._saveStorage() // telegram ignores invokeWithoutUpdates for auth methods - // todo where is this._disableUpdates? - // if (this._disableUpdates) this.primaryConnection._resetSession() - // else - this.startUpdatesLoop() + if (this.network.params.disableUpdates) this.network.resetSessions() + else this.startUpdatesLoop() return new User(this, res.user) } diff --git a/packages/client/src/methods/auth/sign-in.ts b/packages/client/src/methods/auth/sign-in.ts index e5982233..d37afd6c 100644 --- a/packages/client/src/methods/auth/sign-in.ts +++ b/packages/client/src/methods/auth/sign-in.ts @@ -41,19 +41,19 @@ export async function signIn( assertTypeIs('signIn (@ auth.signIn -> user)', res.user, 'user') - this.log.prefix = `[USER ${this._userId}] ` this._userId = res.user.id + this.log.prefix = `[USER ${this._userId}] ` this._isBot = false this._selfChanged = true this._selfUsername = res.user.username ?? null + await this.network.notifyLoggedIn(res) + await this._fetchUpdatesState() await this._saveStorage() // telegram ignores invokeWithoutUpdates for auth methods - // todo where is this._disableUpdates? - // if (this._disableUpdates) this.primaryConnection._resetSession() - // else - this.startUpdatesLoop() + if (this.network.params.disableUpdates) this.network.resetSessions() + else this.startUpdatesLoop() return new User(this, res.user) } diff --git a/packages/client/src/methods/auth/sign-up.ts b/packages/client/src/methods/auth/sign-up.ts index 7b98fc98..b9449e2c 100644 --- a/packages/client/src/methods/auth/sign-up.ts +++ b/packages/client/src/methods/auth/sign-up.ts @@ -32,18 +32,19 @@ export async function signUp( assertTypeIs('signUp (@ auth.signUp)', res, 'auth.authorization') assertTypeIs('signUp (@ auth.signUp -> user)', res.user, 'user') - this.log.prefix = `[USER ${this._userId}] ` this._userId = res.user.id + this.log.prefix = `[USER ${this._userId}] ` this._isBot = false this._selfChanged = true + + await this.network.notifyLoggedIn(res) + await this._fetchUpdatesState() await this._saveStorage() // telegram ignores invokeWithoutUpdates for auth methods - // todo where is this._disableUpdates? - // if (this._disableUpdates) this.primaryConnection._resetSession() - // else - this.startUpdatesLoop() + if (this.network.params.disableUpdates) this.network.resetSessions() + else this.startUpdatesLoop() return new User(this, res.user) } diff --git a/packages/core/src/base-client.ts b/packages/core/src/base-client.ts index ab65f13f..48b9d1d2 100644 --- a/packages/core/src/base-client.ts +++ b/packages/core/src/base-client.ts @@ -227,8 +227,6 @@ export class BaseTelegramClient extends EventEmitter { this.call({ _: 'help.getConfig' }), ) - private _additionalConnections: SessionConnection[] = [] - // not really connected, but rather "connect() was called" private _connected: ControllablePromise | boolean = false @@ -335,6 +333,9 @@ export class BaseTelegramClient extends EventEmitter { return } + // we cant do this in constructor because we need to support subclassing + this.network.setUpdateHandler(this._handleUpdate.bind(this)) + const promise = (this._connected = createControllablePromise()) await this._loadStorage() @@ -406,36 +407,10 @@ export class BaseTelegramClient extends EventEmitter { this._config.destroy() this.network.destroy() - // close additional connections - this._additionalConnections.forEach((conn) => conn.destroy()) - await this._saveStorage() await this.storage.destroy?.() } - /** - * Change primary DC and write that fact to the storage. - * Will immediately reconnect to another DC. - * - * @param newDc New DC or its ID - */ - async changeDc(newDc: tl.RawDcOption | number): Promise { - if (typeof newDc === 'number') { - const res = await this._config.findOption({ - dcId: newDc, - allowIpv6: this._useIpv6, - }) - if (!res) throw new Error('DC not found') - newDc = res - } - - this._defaultDc = newDc - await this.storage.setDefaultDc(newDc) - await this._saveStorage() - // todo - // await this.primaryConnection.changeDc(newDc) - } - /** * Make an RPC call to the primary DC. * This method handles DC migration, flood waits and retries automatically. @@ -465,118 +440,6 @@ export class BaseTelegramClient extends EventEmitter { return res } - // /** - // * Creates an additional connection to a given DC. - // * This will use auth key for that DC that was already stored - // * in the session, or generate a new auth key by exporting - // * authorization from primary DC and importing it to the new DC. - // * New connection will use the same crypto provider, `initConnection`, - // * transport and reconnection strategy as the primary connection - // * - // * This method is quite low-level and you shouldn't usually care about this - // * when using high-level API provided by `@mtcute/client`. - // * - // * @param dcId DC id, to which the connection will be created - // * @param cdn Whether that DC is a CDN DC - // * @param inactivityTimeout - // * Inactivity timeout for the connection (in ms), after which the transport will be closed. - // * Note that connection can still be used normally, it's just the transport which is closed. - // * Defaults to 5 min - // */ - // async createAdditionalConnection( - // dcId: number, - // params?: { - // // todo proper docs - // // default = false - // media?: boolean - // // default = fa;se - // cdn?: boolean - // // default = 300_000 - // inactivityTimeout?: number - // // default = false - // disableUpdates?: boolean - // } - // ): Promise { - // const dc = await this._config.findOption({ - // dcId, - // preferMedia: params?.media, - // cdn: params?.cdn, - // allowIpv6: this._useIpv6, - // }) - // if (!dc) throw new Error('DC not found') - // const connection = new SessionConnection( - // { - // dc, - // testMode: this._testMode, - // crypto: this._crypto, - // initConnection: this._initConnectionParams, - // transportFactory: this._transportFactory, - // reconnectionStrategy: this._reconnectionStrategy, - // inactivityTimeout: params?.inactivityTimeout ?? 300_000, - // layer: this._layer, - // disableUpdates: params?.disableUpdates, - // readerMap: this._readerMap, - // writerMap: this._writerMap, - // }, - // this.log.create('connection') - // ) - // - // connection.on('error', (err) => this._emitError(err, connection)) - // await connection.setupKeys(await this.storage.getAuthKeyFor(dc.id)) - // connection.connect() - // - // if (!connection.getAuthKey()) { - // this.log.info('exporting auth to DC %d', dcId) - // const auth = await this.call({ - // _: 'auth.exportAuthorization', - // dcId, - // }) - // await connection.sendRpc({ - // _: 'auth.importAuthorization', - // id: auth.id, - // bytes: auth.bytes, - // }) - // - // // connection.authKey was already generated at this point - // this.storage.setAuthKeyFor(dc.id, connection.getAuthKey()!) - // await this._saveStorage() - // } else { - // // in case the auth key is invalid - // const dcId = dc.id - // connection.on('key-change', async (key) => { - // // we don't need to export, it will be done by `.call()` - // // in case this error is returned - // // - // // even worse, exporting here will lead to a race condition, - // // and may result in redundant re-exports. - // - // this.storage.setAuthKeyFor(dcId, key) - // await this._saveStorage() - // }) - // } - // - // this._additionalConnections.push(connection) - // - // return connection - // } - - /** - * Destroy a connection that was previously created using - * {@link BaseTelegramClient.createAdditionalConnection}. - * Passing any other connection will not have any effect. - * - * @param connection Connection created with {@link BaseTelegramClient.createAdditionalConnection} - */ - async destroyAdditionalConnection( - connection: SessionConnection, - ): Promise { - const idx = this._additionalConnections.indexOf(connection) - if (idx === -1) return - - await connection.destroy() - this._additionalConnections.splice(idx, 1) - } - /** * Change transport for the client. * @@ -589,11 +452,7 @@ export class BaseTelegramClient extends EventEmitter { */ changeTransport(factory: TransportFactory): void { // todo - // this.primaryConnection.changeTransport(factory) - - this._additionalConnections.forEach((conn) => - conn.changeTransport(factory), - ) + this.network.changeTransport(factory) } /** diff --git a/packages/core/src/network/config-manager.ts b/packages/core/src/network/config-manager.ts index 80690d28..2d6d6e69 100644 --- a/packages/core/src/network/config-manager.ts +++ b/packages/core/src/network/config-manager.ts @@ -64,6 +64,7 @@ export class ConfigManager { dcId: number allowIpv6?: boolean preferIpv6?: boolean + allowMedia?: boolean preferMedia?: boolean cdn?: boolean }): Promise { @@ -72,6 +73,7 @@ export class ConfigManager { const options = this._config!.dcOptions.filter((opt) => { if (opt.tcpoOnly) return false // unsupported if (opt.ipv6 && !params.allowIpv6) return false + if (opt.mediaOnly && !params.allowMedia) return false if (opt.cdn && !params.cdn) return false return opt.id === params.dcId diff --git a/packages/core/src/network/multi-session-connection.ts b/packages/core/src/network/multi-session-connection.ts index 24b00e50..9730a401 100644 --- a/packages/core/src/network/multi-session-connection.ts +++ b/packages/core/src/network/multi-session-connection.ts @@ -8,6 +8,7 @@ import { SessionConnection, SessionConnectionParams, } from './session-connection' +import { TransportFactory } from './transports' export class MultiSessionConnection extends EventEmitter { private _log: Logger @@ -293,4 +294,18 @@ export class MultiSessionConnection extends EventEmitter { requestAuth(): void { this._connections[0]._authorize() } + + resetSessions(): void { + if (this.params.isMainConnection) { + for (const conn of this._connections) { + conn._resetSession() + } + } else { + this._connections[0]._resetSession() + } + } + + changeTransport(factory: TransportFactory): void { + this._connections.forEach((conn) => conn.changeTransport(factory)) + } } diff --git a/packages/core/src/network/network-manager.ts b/packages/core/src/network/network-manager.ts index 6b817c1f..233c144c 100644 --- a/packages/core/src/network/network-manager.ts +++ b/packages/core/src/network/network-manager.ts @@ -500,7 +500,7 @@ export class NetworkManager { dcId, allowIpv6: this.params.useIpv6, preferIpv6: this.params.useIpv6, - preferMedia: dcId !== this._primaryDc?.dcId, + allowMedia: false, cdn: false, }) @@ -541,14 +541,6 @@ export class NetworkManager { dcId: manager.dcId, }) - // manager.ensureMainConnection() - // - // if (!manager.main._sessions[0]._authKey.ready) { - // await manager.loadKeys() - // } - // - // manager.main.ensureConnected() - const res = await this.call( { _: 'auth.importAuthorization', @@ -584,13 +576,26 @@ export class NetworkManager { } async notifyLoggedIn(auth: tl.auth.TypeAuthorization): Promise { - if (auth._ === 'auth.authorizationSignUpRequired') return + if ( + auth._ === 'auth.authorizationSignUpRequired' || + auth.user._ === 'userEmpty' + ) { return } if (auth.tmpSessions) { this._primaryDc?.main.setCount(auth.tmpSessions) } - // await this.exportAuth() + await this.exportAuth() + } + + resetSessions(): void { + const dc = this._primaryDc + if (!dc) return + + dc.main.resetSessions() + dc.upload.resetSessions() + dc.download.resetSessions() + dc.downloadSmall.resetSessions() } private _onConfigChanged(config: tl.RawConfig): void { @@ -607,6 +612,7 @@ export class NetworkManager { allowIpv6: this.params.useIpv6, preferIpv6: this.params.useIpv6, cdn: false, + allowMedia: false, }) if (!option) { @@ -750,6 +756,19 @@ export class NetworkManager { throw lastError } + setUpdateHandler(handler: NetworkManager['_updateHandler']): void { + this._updateHandler = handler + } + + changeTransport(factory: TransportFactory): void { + Object.values(this._dcConnections).forEach((dc) => { + dc.main.changeTransport(factory) + dc.upload.changeTransport(factory) + dc.download.changeTransport(factory) + dc.downloadSmall.changeTransport(factory) + }) + } + destroy(): void { for (const dc of Object.values(this._dcConnections)) { dc.main.destroy() diff --git a/packages/core/src/network/persistent-connection.ts b/packages/core/src/network/persistent-connection.ts index cc31261d..61bb5489 100644 --- a/packages/core/src/network/persistent-connection.ts +++ b/packages/core/src/network/persistent-connection.ts @@ -30,7 +30,7 @@ export abstract class PersistentConnection extends EventEmitter { private _uid = nextConnectionUid++ readonly params: PersistentConnectionParams - private _transport!: ITelegramTransport + protected _transport!: ITelegramTransport private _sendOnceConnected: Buffer[] = [] diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index fa995d9c..546fc64c 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -261,7 +261,7 @@ export class SessionConnection extends PersistentConnection { } } - this.emit('error', error) + this.emit('api-error', error) } protected onConnectionUsable() { @@ -271,7 +271,10 @@ export class SessionConnection extends PersistentConnection { // we must send some user-related rpc to the server to make sure that // it will send us updates this.sendRpc({ _: 'updates.getState' }).catch((err: any) => { - this.log.warn('failed to send updates.getState: %s', err) + this.log.warn( + 'failed to send updates.getState: %s', + err.text || err.message, + ) }) } @@ -1453,11 +1456,13 @@ export class SessionConnection extends PersistentConnection { method, this.params.layer, ) + const proxy = this._transport.getMtproxyInfo?.() obj = { _: 'invokeWithLayer', layer: this.params.layer, query: { ...this.params.initConnection, + proxy, query: obj, }, } diff --git a/packages/core/src/network/transports/abstract.ts b/packages/core/src/network/transports/abstract.ts index d0399746..435f0718 100644 --- a/packages/core/src/network/transports/abstract.ts +++ b/packages/core/src/network/transports/abstract.ts @@ -58,6 +58,8 @@ export interface ITelegramTransport extends EventEmitter { * This method is called before any other. */ setup?(crypto: ICryptoProvider, log: Logger): void + + getMtproxyInfo?(): tl.RawInputClientProxy } /** Transport factory function */ diff --git a/packages/mtproxy/index.ts b/packages/mtproxy/index.ts index d5c82a7e..c09901e0 100644 --- a/packages/mtproxy/index.ts +++ b/packages/mtproxy/index.ts @@ -91,10 +91,20 @@ export class MtProxyTcpTransport extends BaseTcpTransport { } } + getMtproxyInfo(): tl.RawInputClientProxy { + return { + _: 'inputClientProxy', + address: this._proxy.host, + port: this._proxy.port, + } + } + _packetCodec!: IPacketCodec connect(dc: tl.RawDcOption, testMode: boolean): void { - if (this._state !== TransportState.Idle) { throw new Error('Transport is not IDLE') } + if (this._state !== TransportState.Idle) { + throw new Error('Transport is not IDLE') + } if (this._packetCodec && this._currentDc?.id !== dc.id) { // dc changed, thus the codec's init will change too diff --git a/packages/socks-proxy/package.json b/packages/socks-proxy/package.json index 6e3540b6..d84ffe23 100644 --- a/packages/socks-proxy/package.json +++ b/packages/socks-proxy/package.json @@ -12,6 +12,6 @@ }, "dependencies": { "@mtcute/core": "workspace:^1.0.0", - "ip6": "0.2.10" + "ip6": "0.2.7" } } diff --git a/packages/sqlite/index.ts b/packages/sqlite/index.ts index ee005e5a..e4e73b10 100644 --- a/packages/sqlite/index.ts +++ b/packages/sqlite/index.ts @@ -401,7 +401,8 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage { private _initializeStatements(): void { this._statements = {} as unknown as typeof this._statements Object.entries(STATEMENTS).forEach(([name, sql]) => { - this._statements[name as keyof typeof this._statements] = this._db.prepare(sql) + this._statements[name as keyof typeof this._statements] = + this._db.prepare(sql) }) } @@ -417,7 +418,7 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage { const versionResult = this._db .prepare("select value from kv where key = 'ver'") .get() - const version = (versionResult as { value: number }).value + const version = Number((versionResult as { value: number }).value) this.log.debug('current db version = %d', version) @@ -446,7 +447,10 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage { load(): void { this._db = sqlite3(this._filename, { - verbose: this.log.mgr.level === 5 ? this.log.verbose as Options['verbose'] : undefined, + verbose: + this.log.mgr.level === 5 ? + (this.log.verbose as Options['verbose']) : + undefined, }) this._initialize() @@ -648,7 +652,9 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage { const cached = this._cache?.get(peerId) if (cached) return cached.peer - const row = this._statements.getEntById.get(peerId) as SqliteEntity | null + const row = this._statements.getEntById.get( + peerId, + ) as SqliteEntity | null if (row) { const peer = getInputPeer(row) @@ -664,7 +670,9 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage { } getPeerByPhone(phone: string): tl.TypeInputPeer | null { - const row = this._statements.getEntByPhone.get(phone) as SqliteEntity | null + const row = this._statements.getEntByPhone.get( + phone, + ) as SqliteEntity | null if (row) { const peer = getInputPeer(row) @@ -680,7 +688,9 @@ export class SqliteStorage implements ITelegramStorage, IStateStorage { } getPeerByUsername(username: string): tl.TypeInputPeer | null { - const row = this._statements.getEntByUser.get(username.toLowerCase()) as SqliteEntity | null + const row = this._statements.getEntByUser.get( + username.toLowerCase(), + ) as SqliteEntity | null if (!row || Date.now() - row.updated > USERNAME_TTL) return null if (row) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 50113116..4ff6af0b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1,5 +1,9 @@ lockfileVersion: '6.0' +settings: + autoInstallPeers: true + excludeLinksFromLockfile: false + importers: .: @@ -272,8 +276,8 @@ importers: specifier: workspace:^1.0.0 version: link:../core ip6: - specifier: 0.2.10 - version: 0.2.10 + specifier: 0.2.7 + version: 0.2.7 packages/sqlite: dependencies: @@ -3012,6 +3016,7 @@ packages: /iconv-lite@0.6.3: resolution: {integrity: sha512-4fCk79wshMdzMp2rH06qWrJE4iolqLhCUH+OiuIgU++RB0+94NlDL81atO7GX55uUKueo0txHNtvEyI6D7WdMw==} engines: {node: '>=0.10.0'} + requiresBuild: true dependencies: safer-buffer: 2.1.2 dev: false @@ -3067,8 +3072,8 @@ packages: side-channel: 1.0.4 dev: true - /ip6@0.2.10: - resolution: {integrity: sha512-1LdpyKjhvepd6EbAU6rW4g14vuYtx5TnJX9TfZZBhsM6DsyPQLNzW12rtbUqXBMwqFrLVV/Gcxv0GNFvJp2cYA==} + /ip6@0.2.7: + resolution: {integrity: sha512-zEzGsxn4Uw33TByv0DdX/RRh+VsGfEctOp7CvJq/b4JEjY9OvPB58dsMYiEwIVLsIWHZSJPn3XG8mP9Qv3TG3g==} hasBin: true dev: false @@ -4580,6 +4585,7 @@ packages: /safer-buffer@2.1.2: resolution: {integrity: sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==} + requiresBuild: true dev: false optional: true