fix: improved reconnecting
All checks were successful
Tests / test-deno (push) Successful in 1m44s
Tests / test-bun (push) Successful in 1m51s
Tests / test-node (node22) (push) Successful in 1m57s
Tests / test-node (node20) (push) Successful in 2m2s
Tests / test-node (node18) (push) Successful in 2m5s
Tests / test-web (chromium) (push) Successful in 2m1s
Tests / test-web (firefox) (push) Successful in 2m10s
Build and deploy typedoc / build (push) Successful in 5m58s
Tests / lint (push) Successful in 6m36s
Tests / e2e (push) Successful in 46s
Tests / e2e-deno (push) Successful in 54s
All checks were successful
Tests / test-deno (push) Successful in 1m44s
Tests / test-bun (push) Successful in 1m51s
Tests / test-node (node22) (push) Successful in 1m57s
Tests / test-node (node20) (push) Successful in 2m2s
Tests / test-node (node18) (push) Successful in 2m5s
Tests / test-web (chromium) (push) Successful in 2m1s
Tests / test-web (firefox) (push) Successful in 2m10s
Build and deploy typedoc / build (push) Successful in 5m58s
Tests / lint (push) Successful in 6m36s
Tests / e2e (push) Successful in 46s
Tests / e2e-deno (push) Successful in 54s
This commit is contained in:
parent
6c9f4c897d
commit
cc7ca2997e
2 changed files with 16 additions and 18 deletions
|
@ -5,7 +5,7 @@ import type { IPacketCodec, ITelegramConnection, TelegramTransport } from './tra
|
||||||
|
|
||||||
import { FramedReader, FramedWriter } from '@fuman/io'
|
import { FramedReader, FramedWriter } from '@fuman/io'
|
||||||
|
|
||||||
import { PersistentConnection as FumanPersistentConnection } from '@fuman/net'
|
import { ConnectionClosedError, PersistentConnection as FumanPersistentConnection } from '@fuman/net'
|
||||||
import { Emitter, timers } from '@fuman/utils'
|
import { Emitter, timers } from '@fuman/utils'
|
||||||
|
|
||||||
export interface PersistentConnectionParams {
|
export interface PersistentConnectionParams {
|
||||||
|
@ -140,7 +140,7 @@ export abstract class PersistentConnection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async _onClose() {
|
private _onClose(): void {
|
||||||
this.log.debug('connection closed')
|
this.log.debug('connection closed')
|
||||||
this._updateLogPrefix()
|
this._updateLogPrefix()
|
||||||
|
|
||||||
|
@ -149,9 +149,12 @@ export abstract class PersistentConnection {
|
||||||
this.onClosed()
|
this.onClosed()
|
||||||
}
|
}
|
||||||
|
|
||||||
private async _onError(err: Error) {
|
private _onError(err: Error) {
|
||||||
this._updateLogPrefix()
|
this._updateLogPrefix()
|
||||||
this.handleError(err)
|
|
||||||
|
if (!(err instanceof ConnectionClosedError)) {
|
||||||
|
this.handleError(err)
|
||||||
|
}
|
||||||
|
|
||||||
return 'reconnect' as const
|
return 'reconnect' as const
|
||||||
}
|
}
|
||||||
|
@ -231,7 +234,13 @@ export abstract class PersistentConnection {
|
||||||
|
|
||||||
if (this._writer) {
|
if (this._writer) {
|
||||||
this._rescheduleInactivity()
|
this._rescheduleInactivity()
|
||||||
await this._writer.write(data)
|
try {
|
||||||
|
await this._writer.write(data)
|
||||||
|
} catch (e: unknown) {
|
||||||
|
this.log.warn('encountered an error closed while sending, reconnecting: %e', e)
|
||||||
|
this._fuman.reconnect(true)
|
||||||
|
this._sendOnceConnected.push(data)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
this._sendOnceConnected.push(data)
|
this._sendOnceConnected.push(data)
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,8 +10,8 @@ import { Deferred, Emitter, timers, u8 } from '@fuman/utils'
|
||||||
import { tl } from '@mtcute/tl'
|
import { tl } from '@mtcute/tl'
|
||||||
import { TlBinaryReader, TlBinaryWriter, TlSerializationCounter } from '@mtcute/tl-runtime'
|
import { TlBinaryReader, TlBinaryWriter, TlSerializationCounter } from '@mtcute/tl-runtime'
|
||||||
import Long from 'long'
|
import Long from 'long'
|
||||||
import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types/index.js'
|
|
||||||
|
|
||||||
|
import { MtArgumentError, MtcuteError, MtTimeoutError } from '../types/index.js'
|
||||||
import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js'
|
import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js'
|
||||||
import {
|
import {
|
||||||
EarlyTimer,
|
EarlyTimer,
|
||||||
|
@ -2050,18 +2050,7 @@ export class SessionConnection extends PersistentConnection {
|
||||||
)
|
)
|
||||||
|
|
||||||
const enc = this._session.encryptMessage(result)
|
const enc = this._session.encryptMessage(result)
|
||||||
const promise = this.send(enc).catch((err: Error) => {
|
const promise = this.send(enc)
|
||||||
if (this._destroyed) return
|
|
||||||
this.log.error('error while sending pending messages (root msg_id = %l): %e', rootMsgId, err)
|
|
||||||
|
|
||||||
// put acks in the front so they are the first to be sent
|
|
||||||
if (ackMsgIds) {
|
|
||||||
this._session.queuedAcks.splice(0, 0, ...ackMsgIds)
|
|
||||||
}
|
|
||||||
if (rootMsgId) {
|
|
||||||
this._onMessageFailed(rootMsgId, 'unknown error')
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
if (this._inactivityPendingFlush && !this._session.hasPendingMessages) {
|
if (this._inactivityPendingFlush && !this._session.hasPendingMessages) {
|
||||||
void promise.then(() => {
|
void promise.then(() => {
|
||||||
|
|
Loading…
Reference in a new issue