network middlewares #60
10 changed files with 377 additions and 9 deletions
|
@ -11,7 +11,9 @@ import {
|
|||
asyncResettable,
|
||||
computeNewPasswordHash,
|
||||
computeSrpParams,
|
||||
ICryptoProvider,
|
||||
isTlRpcError,
|
||||
Logger,
|
||||
readStringSession,
|
||||
StringSessionData,
|
||||
writeStringSession,
|
||||
|
@ -45,10 +47,10 @@ export class BaseTelegramClient implements ITelegramClient {
|
|||
private _serverUpdatesHandler: ServerUpdateHandler = () => {}
|
||||
private _connectionStateHandler: (state: ConnectionState) => void = () => {}
|
||||
|
||||
readonly log
|
||||
readonly mt
|
||||
readonly crypto
|
||||
readonly storage
|
||||
readonly log: Logger
|
||||
readonly mt: MtClient
|
||||
readonly crypto: ICryptoProvider
|
||||
readonly storage: TelegramStorageManager
|
||||
|
||||
constructor(readonly params: BaseTelegramClientOptions) {
|
||||
this.log = this.params.logger ?? new LogManager('client')
|
||||
|
|
|
@ -1,5 +1,12 @@
|
|||
export * from './client.js'
|
||||
export type { ConnectionKind, NetworkManagerExtraParams, RpcCallOptions } from './network-manager.js'
|
||||
export * from './middlewares/index.js'
|
||||
export type {
|
||||
ConnectionKind,
|
||||
NetworkManagerExtraParams,
|
||||
RpcCallMiddleware,
|
||||
RpcCallMiddlewareContext,
|
||||
RpcCallOptions,
|
||||
} from './network-manager.js'
|
||||
export * from './reconnection.js'
|
||||
export * from './session-connection.js'
|
||||
export * from './transports/index.js'
|
||||
|
|
2
packages/core/src/network/middlewares/bundle.ts
Normal file
2
packages/core/src/network/middlewares/bundle.ts
Normal file
|
@ -0,0 +1,2 @@
|
|||
export * from './on-error.js'
|
||||
export * from './on-method.js'
|
3
packages/core/src/network/middlewares/index.ts
Normal file
3
packages/core/src/network/middlewares/index.ts
Normal file
|
@ -0,0 +1,3 @@
|
|||
import * as networkMiddlewares from './bundle.js'
|
||||
|
||||
export { networkMiddlewares }
|
49
packages/core/src/network/middlewares/on-error.ts
Normal file
49
packages/core/src/network/middlewares/on-error.ts
Normal file
|
@ -0,0 +1,49 @@
|
|||
import { mtp } from '@mtcute/tl'
|
||||
|
||||
import { MaybePromise } from '../../types/utils.js'
|
||||
import { isTlRpcError } from '../../utils/type-assertions.js'
|
||||
import { RpcCallMiddleware, RpcCallMiddlewareContext } from '../network-manager.js'
|
||||
|
||||
/**
|
||||
* Middleware that will call `handler` whenever an RPC error happens,
|
||||
* with the error object itself.
|
||||
*
|
||||
* The handler can either return nothing
|
||||
* (in which case the original error will be thrown), a new error
|
||||
* (via the `_: 'mt_rpc_error'` object), or any other value, which
|
||||
* will be returned as the result of the RPC call.
|
||||
*
|
||||
* Note that the return value is **not type-checked**
|
||||
* due to limitations of TypeScript. You'll probably want to use `satisfies`
|
||||
* keyword to ensure the return value is correct, for example:
|
||||
*
|
||||
* ```ts
|
||||
* networkMiddlewares.onRpcError(async (ctx, error) => {
|
||||
* if (rpc.request._ === 'help.getNearestDc') {
|
||||
* return {
|
||||
* _: 'nearestDc',
|
||||
* country: 'RU',
|
||||
* thisDc: 2,
|
||||
* nearestDc: 2,
|
||||
* } satisfies tl.RpcCallReturn['help.getNearestDc']
|
||||
* }
|
||||
* })
|
||||
* ```
|
||||
*/
|
||||
export function onRpcError(
|
||||
handler: (ctx: RpcCallMiddlewareContext, error: mtp.RawMt_rpc_error) => MaybePromise<unknown>,
|
||||
): RpcCallMiddleware {
|
||||
return async (ctx, next) => {
|
||||
let res = await next(ctx)
|
||||
|
||||
if (isTlRpcError(res)) {
|
||||
const handlerRes = await handler(ctx, res)
|
||||
|
||||
if (handlerRes !== undefined) {
|
||||
res = handlerRes
|
||||
}
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
}
|
54
packages/core/src/network/middlewares/on-method.ts
Normal file
54
packages/core/src/network/middlewares/on-method.ts
Normal file
|
@ -0,0 +1,54 @@
|
|||
import { tl } from '@mtcute/tl'
|
||||
|
||||
import { Middleware } from '../../utils/composer.js'
|
||||
import { RpcCallMiddleware, RpcCallMiddlewareContext } from '../network-manager.js'
|
||||
|
||||
/**
|
||||
* Middleware that will call `handler` whenever `method` RPC method is called.
|
||||
*
|
||||
* This helper exists due to TypeScript limitations not allowing us to
|
||||
* properly type the return type without explicit type annotations,
|
||||
* for a bit more type-safe and clean code:
|
||||
*
|
||||
* ```ts
|
||||
* // before
|
||||
* async (ctx, next) => {
|
||||
* if (rpc.request._ === 'help.getNearestDc') {
|
||||
* return {
|
||||
* _: 'nearestDc',
|
||||
* country: 'RU',
|
||||
* thisDc: 2,
|
||||
* nearestDc: 2,
|
||||
* } satisfies tl.RpcCallReturn['help.getNearestDc']
|
||||
* }
|
||||
*
|
||||
* return next(ctx)
|
||||
* }
|
||||
*
|
||||
* // after
|
||||
* onMethod('help.getNearestDc', async () => ({
|
||||
* _: 'nearestDc' as const, // (otherwise ts will infer this as `string` and will complain)
|
||||
* country: 'RU',
|
||||
* thisDc: 2,
|
||||
* nearestDc: 2,
|
||||
* })
|
||||
* ```
|
||||
*/
|
||||
export function onMethod<T extends tl.RpcMethod['_']>(
|
||||
method: T,
|
||||
middleware: Middleware<
|
||||
Omit<RpcCallMiddlewareContext, 'request'> & {
|
||||
request: Extract<tl.RpcMethod, { _: T }>
|
||||
},
|
||||
tl.RpcCallReturn[T]
|
||||
>,
|
||||
): RpcCallMiddleware {
|
||||
return async (ctx, next) => {
|
||||
if (ctx.request._ !== method) {
|
||||
return next(ctx)
|
||||
}
|
||||
|
||||
// eslint-disable-next-line
|
||||
return middleware(ctx as any, next)
|
||||
}
|
||||
}
|
|
@ -4,6 +4,7 @@ import { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
|
|||
import { getPlatform } from '../platform.js'
|
||||
import { StorageManager } from '../storage/storage.js'
|
||||
import { MtArgumentError, MtcuteError, MtTimeoutError, MtUnsupportedError } from '../types/index.js'
|
||||
import { ComposedMiddleware, composeMiddlewares, Middleware } from '../utils/composer.js'
|
||||
import {
|
||||
ControllablePromise,
|
||||
createControllablePromise,
|
||||
|
@ -113,6 +114,14 @@ export interface NetworkManagerExtraParams {
|
|||
* @default 60000 (60 seconds).
|
||||
*/
|
||||
inactivityTimeout?: number
|
||||
|
||||
/**
|
||||
* List of middlewares to use for the network manager
|
||||
*
|
||||
* > **Note**: these middlewares apply to **outgoing requests only**.
|
||||
* > If you need to handle incoming updates, use a {@link Dispatcher} instead.
|
||||
*/
|
||||
middlewares?: Middleware<RpcCallMiddlewareContext, unknown>[]
|
||||
}
|
||||
|
||||
/** Options that can be customized when making an RPC call */
|
||||
|
@ -198,6 +207,13 @@ export interface RpcCallOptions {
|
|||
chainId?: string | number
|
||||
}
|
||||
|
||||
export interface RpcCallMiddlewareContext {
|
||||
request: tl.RpcMethod
|
||||
manager: NetworkManager
|
||||
params?: RpcCallOptions
|
||||
}
|
||||
export type RpcCallMiddleware<Result = unknown> = Middleware<RpcCallMiddlewareContext, Result>
|
||||
|
||||
/**
|
||||
* Wrapper over all connection pools for a single DC.
|
||||
*/
|
||||
|
@ -501,6 +517,8 @@ export class NetworkManager {
|
|||
this._connectionCount = params.connectionCount ?? defaultConnectionCountDelegate
|
||||
this._updateHandler = params.onUpdate
|
||||
|
||||
this.call = this._composeCall(params.middlewares)
|
||||
|
||||
this._onConfigChanged = this._onConfigChanged.bind(this)
|
||||
config.onReload(this._onConfigChanged)
|
||||
|
||||
|
@ -752,11 +770,34 @@ export class NetworkManager {
|
|||
await this._switchPrimaryDc(this._dcConnections.get(newDc)!)
|
||||
}
|
||||
|
||||
private _floodWaitedRequests = new Map<string, number>()
|
||||
async call<T extends tl.RpcMethod>(
|
||||
readonly call: <T extends tl.RpcMethod>(
|
||||
message: T,
|
||||
params?: RpcCallOptions,
|
||||
): Promise<tl.RpcCallReturn[T['_']] | mtp.RawMt_rpc_error> {
|
||||
) => Promise<tl.RpcCallReturn[T['_']] | mtp.RawMt_rpc_error>
|
||||
|
||||
private _composeCall = (middlewares?: Middleware<RpcCallMiddlewareContext, unknown>[]) => {
|
||||
if (!middlewares?.length) {
|
||||
return this._call
|
||||
}
|
||||
|
||||
const final: ComposedMiddleware<RpcCallMiddlewareContext, unknown> = async (ctx) => {
|
||||
return this._call(ctx.request, ctx.params)
|
||||
}
|
||||
const composed = composeMiddlewares(middlewares, final)
|
||||
|
||||
return async <T extends tl.RpcMethod>(message: T, params?: RpcCallOptions): Promise<tl.RpcCallReturn[T['_']]> =>
|
||||
composed({
|
||||
request: message,
|
||||
manager: this,
|
||||
params,
|
||||
})
|
||||
}
|
||||
|
||||
private _floodWaitedRequests = new Map<string, number>()
|
||||
private _call = async <T extends tl.RpcMethod>(
|
||||
message: T,
|
||||
params?: RpcCallOptions,
|
||||
): Promise<tl.RpcCallReturn[T['_']] | mtp.RawMt_rpc_error> => {
|
||||
if (!this._primaryDc) {
|
||||
throw new MtcuteError('Not connected to any DC')
|
||||
}
|
||||
|
@ -871,7 +912,7 @@ export class NetworkManager {
|
|||
if (params?.localMigrate) {
|
||||
manager = await this._getOtherDc(newDc)
|
||||
} else {
|
||||
this._log.info('Migrate error, new dc = %d', newDc)
|
||||
this._log.info('received %s, migrating to dc %d', err, newDc)
|
||||
|
||||
await this.changePrimaryDc(newDc)
|
||||
manager = this._primaryDc!
|
||||
|
|
179
packages/core/src/utils/composer.test.ts
Normal file
179
packages/core/src/utils/composer.test.ts
Normal file
|
@ -0,0 +1,179 @@
|
|||
/* eslint-disable @typescript-eslint/require-await */
|
||||
import { describe, expect, it } from 'vitest'
|
||||
|
||||
import { composeMiddlewares, Middleware } from './composer.js'
|
||||
|
||||
describe('composeMiddlewares', () => {
|
||||
it('should compose middlewares', async () => {
|
||||
const trace: unknown[] = []
|
||||
const middlewares: Middleware<number[]>[] = [
|
||||
async (ctx, next) => {
|
||||
trace.push(ctx)
|
||||
trace.push(1)
|
||||
await next([...ctx, 1])
|
||||
trace.push(6)
|
||||
},
|
||||
async (ctx, next) => {
|
||||
trace.push(ctx)
|
||||
trace.push(2)
|
||||
await next([...ctx, 2])
|
||||
trace.push(5)
|
||||
},
|
||||
async (ctx, next) => {
|
||||
trace.push(ctx)
|
||||
trace.push(3)
|
||||
await next([...ctx, 3])
|
||||
trace.push(4)
|
||||
},
|
||||
]
|
||||
|
||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
||||
result = res
|
||||
})
|
||||
|
||||
let result: readonly number[] = []
|
||||
await composed([])
|
||||
|
||||
expect(trace).toEqual([[], 1, [1], 2, [1, 2], 3, 4, 5, 6])
|
||||
expect(result).toEqual([1, 2, 3])
|
||||
})
|
||||
|
||||
it('should handle multiple calls to final', async () => {
|
||||
const trace: unknown[] = []
|
||||
|
||||
const middlewares: Middleware<number[]>[] = [
|
||||
async (ctx, next) => {
|
||||
trace.push(1)
|
||||
await next([2])
|
||||
trace.push(3)
|
||||
await next([4])
|
||||
trace.push(5)
|
||||
},
|
||||
]
|
||||
|
||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
||||
trace.push(res)
|
||||
})
|
||||
|
||||
await composed([])
|
||||
|
||||
expect(trace).toEqual([1, [2], 3, [4], 5])
|
||||
})
|
||||
|
||||
it('should handle multiple calls to next midway', async () => {
|
||||
const trace: unknown[] = []
|
||||
|
||||
const middlewares: Middleware<number[]>[] = [
|
||||
async (ctx, next) => {
|
||||
trace.push(1)
|
||||
await next([2])
|
||||
trace.push(3)
|
||||
await next([4])
|
||||
trace.push(5)
|
||||
},
|
||||
(ctx, next) => next([6, ...ctx]),
|
||||
]
|
||||
|
||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
||||
trace.push(res)
|
||||
})
|
||||
|
||||
await composed([])
|
||||
|
||||
expect(trace).toEqual([1, [6, 2], 3, [6, 4], 5])
|
||||
})
|
||||
|
||||
it('should handle leaf middleware', async () => {
|
||||
const trace: unknown[] = []
|
||||
|
||||
const middlewares: Middleware<number[]>[] = [
|
||||
async (ctx, next) => {
|
||||
trace.push(1)
|
||||
|
||||
return next(ctx)
|
||||
},
|
||||
async () => {
|
||||
/* do nothing */
|
||||
},
|
||||
]
|
||||
|
||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
||||
trace.push(res) // should not be called
|
||||
})
|
||||
|
||||
await composed([])
|
||||
|
||||
expect(trace).toEqual([1])
|
||||
})
|
||||
|
||||
it('should propagate return value', async () => {
|
||||
const trace: unknown[] = []
|
||||
|
||||
const middlewares: Middleware<number[], number>[] = [
|
||||
async (ctx, next) => {
|
||||
trace.push(1)
|
||||
const res = await next([2])
|
||||
trace.push(3)
|
||||
const res2 = await next([3, 4, 5])
|
||||
trace.push(6)
|
||||
|
||||
return res + res2
|
||||
},
|
||||
async (ctx, next) => {
|
||||
trace.push(-1)
|
||||
|
||||
return (await next(ctx)) + 1
|
||||
},
|
||||
]
|
||||
|
||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
||||
trace.push(res)
|
||||
|
||||
return res.length
|
||||
})
|
||||
|
||||
const result = await composed([])
|
||||
|
||||
expect(trace).toEqual([1, -1, [2], 3, -1, [3, 4, 5], 6])
|
||||
expect(result).toBe(6)
|
||||
})
|
||||
|
||||
it('should propagate errors', async () => {
|
||||
const trace: unknown[] = []
|
||||
|
||||
const middlewares: Middleware<number>[] = [
|
||||
async (ctx, next) => {
|
||||
trace.push(1)
|
||||
|
||||
try {
|
||||
await next(2)
|
||||
} catch (e) {
|
||||
trace.push('caught error')
|
||||
}
|
||||
|
||||
trace.push(3)
|
||||
await next(4)
|
||||
trace.push(5)
|
||||
},
|
||||
(ctx, next) => next(ctx), // pass-thru
|
||||
async (ctx, next) => {
|
||||
if (ctx === 2) {
|
||||
trace.push('error')
|
||||
throw new Error('error')
|
||||
} else {
|
||||
trace.push('ok')
|
||||
|
||||
return next(ctx)
|
||||
}
|
||||
},
|
||||
]
|
||||
|
||||
const composed = composeMiddlewares(middlewares, async (res) => {
|
||||
trace.push(`final ${res}`)
|
||||
})
|
||||
|
||||
await composed(0)
|
||||
|
||||
expect(trace).toEqual([1, 'error', 'caught error', 3, 'ok', 'final 4', 5])
|
||||
})
|
||||
})
|
30
packages/core/src/utils/composer.ts
Normal file
30
packages/core/src/utils/composer.ts
Normal file
|
@ -0,0 +1,30 @@
|
|||
export type Middleware<Context, Result = void> = (
|
||||
ctx: Context,
|
||||
next: (ctx: Context) => Promise<Result>,
|
||||
) => Promise<Result>
|
||||
export type ComposedMiddleware<Context, Result = void> = (ctx: Context) => Promise<Result>
|
||||
|
||||
export function composeMiddlewares<Context, Result = void>(
|
||||
middlewares: Middleware<Context, Result>[],
|
||||
final: ComposedMiddleware<Context, Result>,
|
||||
): ComposedMiddleware<Context, Result> {
|
||||
middlewares = middlewares.slice()
|
||||
middlewares.push(final)
|
||||
|
||||
function dispatch(i: number, ctx: Context): Promise<Result> {
|
||||
const fn = middlewares[i]
|
||||
if (!fn) return final(ctx)
|
||||
|
||||
return fn(ctx, boundDispatches[i + 1])
|
||||
}
|
||||
|
||||
const boundDispatches: Array<(ctx: Context) => Promise<Result>> = []
|
||||
|
||||
for (let i = 0; i < middlewares.length; i++) {
|
||||
boundDispatches.push(dispatch.bind(null, i))
|
||||
}
|
||||
|
||||
return function (context: Context): Promise<Result> {
|
||||
return boundDispatches[0](context)
|
||||
}
|
||||
}
|
|
@ -8,6 +8,7 @@ export * from '../storage/service/default-dcs.js'
|
|||
export * from './async-lock.js'
|
||||
export * from './bigint-utils.js'
|
||||
export * from './buffer-utils.js'
|
||||
export * from './composer.js'
|
||||
export * from './condition-variable.js'
|
||||
export * from './controllable-promise.js'
|
||||
export * from './crypto/index.js'
|
||||
|
|
Loading…
Reference in a new issue