fix(core): handle MTPROTO_CLUSTER_INVALID
This commit is contained in:
parent
bb5d2ef676
commit
5d0cdc421a
7 changed files with 69 additions and 5 deletions
|
@ -733,6 +733,7 @@ withParams(params: RpcCallOptions): this\n`)
|
||||||
'computeNewPasswordHash',
|
'computeNewPasswordHash',
|
||||||
'changePrimaryDc',
|
'changePrimaryDc',
|
||||||
'getMtprotoMessageId',
|
'getMtprotoMessageId',
|
||||||
|
'recreateDc',
|
||||||
].forEach((name) => {
|
].forEach((name) => {
|
||||||
output.write(
|
output.write(
|
||||||
`TelegramClient.prototype.${name} = function(...args) {\n`
|
`TelegramClient.prototype.${name} = function(...args) {\n`
|
||||||
|
|
|
@ -346,4 +346,9 @@ export class BaseTelegramClient implements ITelegramClient {
|
||||||
async getMtprotoMessageId(): Promise<Long> {
|
async getMtprotoMessageId(): Promise<Long> {
|
||||||
return this.mt.network.getMtprotoMessageId()
|
return this.mt.network.getMtprotoMessageId()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async recreateDc(dcId: number): Promise<void> {
|
||||||
|
await this.mt.network.config.update(true)
|
||||||
|
await this.mt.network.recreateDc(dcId)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3445,8 +3445,6 @@ export interface TelegramClient extends ITelegramClient {
|
||||||
/**
|
/**
|
||||||
* Maximum message ID to return.
|
* Maximum message ID to return.
|
||||||
*
|
*
|
||||||
* Unless {@link addOffset} is used, this will work the same as {@link offset}.
|
|
||||||
*
|
|
||||||
* @default `0` (disabled).
|
* @default `0` (disabled).
|
||||||
*/
|
*/
|
||||||
maxId?: number
|
maxId?: number
|
||||||
|
@ -6946,3 +6944,6 @@ TelegramClient.prototype.changePrimaryDc = function (...args) {
|
||||||
TelegramClient.prototype.getMtprotoMessageId = function (...args) {
|
TelegramClient.prototype.getMtprotoMessageId = function (...args) {
|
||||||
return this._client.getMtprotoMessageId(...args)
|
return this._client.getMtprotoMessageId(...args)
|
||||||
}
|
}
|
||||||
|
TelegramClient.prototype.recreateDc = function (...args) {
|
||||||
|
return this._client.recreateDc(...args)
|
||||||
|
}
|
||||||
|
|
|
@ -72,4 +72,6 @@ export interface ITelegramClient {
|
||||||
computeSrpParams(request: tl.account.RawPassword, password: string): Promise<tl.RawInputCheckPasswordSRP>
|
computeSrpParams(request: tl.account.RawPassword, password: string): Promise<tl.RawInputCheckPasswordSRP>
|
||||||
computeNewPasswordHash(algo: tl.TypePasswordKdfAlgo, password: string): Promise<Uint8Array>
|
computeNewPasswordHash(algo: tl.TypePasswordKdfAlgo, password: string): Promise<Uint8Array>
|
||||||
getMtprotoMessageId(): Promise<Long>
|
getMtprotoMessageId(): Promise<Long>
|
||||||
|
|
||||||
|
recreateDc(dcId: number): Promise<void>
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,6 +145,12 @@ export async function* downloadAsIterable(
|
||||||
// todo: implement someday
|
// todo: implement someday
|
||||||
// see: https://github.com/LonamiWebs/Telethon/blob/0e8bd8248cc649637b7c392616887c50986427a0/telethon/client/downloads.py#L99
|
// see: https://github.com/LonamiWebs/Telethon/blob/0e8bd8248cc649637b7c392616887c50986427a0/telethon/client/downloads.py#L99
|
||||||
throw new MtUnsupportedError('File ref expired!')
|
throw new MtUnsupportedError('File ref expired!')
|
||||||
|
} else if (e.is('MTPROTO_CLUSTER_INVALID') && dcId != null) {
|
||||||
|
// this is a weird error that happens when we are trying to download a file from a "wrong" media dc
|
||||||
|
// (e.g. this happens when we load media dc ip from session, but the current media dc ip is different)
|
||||||
|
client.log.debug('received cluster invalid error for dc %d, recreating dc', dcId)
|
||||||
|
await client.recreateDc(dcId)
|
||||||
|
return downloadChunk(chunk)
|
||||||
} else {
|
} else {
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
|
||||||
readonly startUpdatesLoop: ITelegramClient['startUpdatesLoop']
|
readonly startUpdatesLoop: ITelegramClient['startUpdatesLoop']
|
||||||
readonly stopUpdatesLoop: ITelegramClient['stopUpdatesLoop']
|
readonly stopUpdatesLoop: ITelegramClient['stopUpdatesLoop']
|
||||||
readonly getMtprotoMessageId: ITelegramClient['getMtprotoMessageId']
|
readonly getMtprotoMessageId: ITelegramClient['getMtprotoMessageId']
|
||||||
|
readonly recreateDc: ITelegramClient['recreateDc']
|
||||||
|
|
||||||
private _abortController = new AbortController()
|
private _abortController = new AbortController()
|
||||||
readonly stopSignal: AbortSignal = this._abortController.signal
|
readonly stopSignal: AbortSignal = this._abortController.signal
|
||||||
|
@ -92,6 +93,7 @@ export abstract class TelegramWorkerPort<Custom extends WorkerCustomMethods> imp
|
||||||
this.startUpdatesLoop = bind('startUpdatesLoop')
|
this.startUpdatesLoop = bind('startUpdatesLoop')
|
||||||
this.stopUpdatesLoop = bind('stopUpdatesLoop')
|
this.stopUpdatesLoop = bind('stopUpdatesLoop')
|
||||||
this.getMtprotoMessageId = bind('getMtprotoMessageId')
|
this.getMtprotoMessageId = bind('getMtprotoMessageId')
|
||||||
|
this.recreateDc = bind('recreateDc')
|
||||||
|
|
||||||
this.timers = new TimersManager()
|
this.timers = new TimersManager()
|
||||||
this.timers.onError(err => this.onError.emit(unknownToError(err)))
|
this.timers.onError(err => this.onError.emit(unknownToError(err)))
|
||||||
|
|
|
@ -487,6 +487,7 @@ export class NetworkManager {
|
||||||
|
|
||||||
protected readonly _dcConnections: Map<number, DcConnectionManager> = new Map()
|
protected readonly _dcConnections: Map<number, DcConnectionManager> = new Map()
|
||||||
protected _primaryDc?: DcConnectionManager
|
protected _primaryDc?: DcConnectionManager
|
||||||
|
protected _primaryDcRecreationPromise?: Deferred<void>
|
||||||
|
|
||||||
private _updateHandler: (upd: tl.TypeUpdates, fromClient: boolean) => void
|
private _updateHandler: (upd: tl.TypeUpdates, fromClient: boolean) => void
|
||||||
|
|
||||||
|
@ -804,7 +805,11 @@ export class NetworkManager {
|
||||||
params?: RpcCallOptions,
|
params?: RpcCallOptions,
|
||||||
): Promise<tl.RpcCallReturn[T['_']] | mtp.RawMt_rpc_error> => {
|
): Promise<tl.RpcCallReturn[T['_']] | mtp.RawMt_rpc_error> => {
|
||||||
if (!this._primaryDc) {
|
if (!this._primaryDc) {
|
||||||
throw new MtcuteError('Not connected to any DC')
|
if (this._primaryDcRecreationPromise) {
|
||||||
|
await this._primaryDcRecreationPromise.promise
|
||||||
|
} else {
|
||||||
|
throw new MtcuteError('Not connected to any DC')
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const kind = params?.kind ?? 'main'
|
const kind = params?.kind ?? 'main'
|
||||||
|
@ -812,10 +817,10 @@ export class NetworkManager {
|
||||||
|
|
||||||
if (params?.manager) {
|
if (params?.manager) {
|
||||||
manager = params.manager
|
manager = params.manager
|
||||||
} else if (params?.dcId && params.dcId !== this._primaryDc.dcId) {
|
} else if (params?.dcId && params.dcId !== this._primaryDc!.dcId) {
|
||||||
manager = await this._getOtherDc(params.dcId)
|
manager = await this._getOtherDc(params.dcId)
|
||||||
} else {
|
} else {
|
||||||
manager = this._primaryDc
|
manager = this._primaryDc!
|
||||||
}
|
}
|
||||||
|
|
||||||
let multi = manager[kind]
|
let multi = manager[kind]
|
||||||
|
@ -912,4 +917,46 @@ export class NetworkManager {
|
||||||
getMtprotoMessageId(): Long {
|
getMtprotoMessageId(): Long {
|
||||||
return this._primaryDc!.main._sessions[0].getMessageId()
|
return this._primaryDc!.main._sessions[0].getMessageId()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async recreateDc(dcId: number): Promise<void> {
|
||||||
|
this._log.debug('recreating dc %d', dcId)
|
||||||
|
const existing = this._dcConnections.get(dcId)
|
||||||
|
if (existing) {
|
||||||
|
await existing.destroy()
|
||||||
|
}
|
||||||
|
|
||||||
|
this._dcConnections.delete(dcId)
|
||||||
|
|
||||||
|
if (dcId === this._primaryDc?.dcId) {
|
||||||
|
const oldPrimaryDc = this._primaryDc
|
||||||
|
this._primaryDc = undefined
|
||||||
|
this._primaryDcRecreationPromise = new Deferred()
|
||||||
|
|
||||||
|
try {
|
||||||
|
const newDefaultDcs: DcOptions = {
|
||||||
|
main: asNonNull(await this.config.findOption({
|
||||||
|
dcId,
|
||||||
|
allowIpv6: this.params.useIpv6,
|
||||||
|
})),
|
||||||
|
media: asNonNull(await this.config.findOption({
|
||||||
|
dcId,
|
||||||
|
allowIpv6: this.params.useIpv6,
|
||||||
|
allowMedia: true,
|
||||||
|
preferMedia: true,
|
||||||
|
})),
|
||||||
|
}
|
||||||
|
|
||||||
|
await this._storage.dcs.store(newDefaultDcs)
|
||||||
|
await this.connect(newDefaultDcs)
|
||||||
|
this._primaryDcRecreationPromise.resolve()
|
||||||
|
this._primaryDcRecreationPromise = undefined
|
||||||
|
} catch (e) {
|
||||||
|
// restore old primary dc to avoid a deadlock
|
||||||
|
this._primaryDc = oldPrimaryDc
|
||||||
|
this._primaryDcRecreationPromise?.resolve()
|
||||||
|
this._primaryDcRecreationPromise = undefined
|
||||||
|
throw e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue