From 0cff4113f328f218d3e4f213115a3c3e23db6009 Mon Sep 17 00:00:00 2001 From: alina sireneva Date: Tue, 1 Oct 2024 04:03:18 +0300 Subject: [PATCH] chore!: migrate to fuman Emitter breaking: all events now use Emitter interface (`.on(event, ...) => `.onEvent.add(...)`) --- packages/core/scripts/generate-client.cjs | 68 ++- packages/core/src/highlevel/base.ts | 52 +-- packages/core/src/highlevel/client.ts | 440 ++++++++---------- packages/core/src/highlevel/client.types.ts | 12 +- .../core/src/highlevel/methods/_imports.ts | 2 + packages/core/src/highlevel/methods/_init.ts | 31 +- .../highlevel/types/updates/parse-update.ts | 6 +- .../core/src/highlevel/updates/manager.ts | 13 +- packages/core/src/highlevel/updates/parsed.ts | 23 +- packages/core/src/highlevel/updates/types.ts | 18 +- packages/core/src/network/client.ts | 21 +- .../src/network/multi-session-connection.ts | 63 ++- packages/core/src/network/network-manager.ts | 26 +- .../core/src/network/persistent-connection.ts | 21 +- .../core/src/network/session-connection.ts | 56 ++- packages/dispatcher/src/dispatcher.ts | 11 +- pnpm-lock.yaml | 3 - 17 files changed, 389 insertions(+), 477 deletions(-) diff --git a/packages/core/scripts/generate-client.cjs b/packages/core/scripts/generate-client.cjs index df081f49..53f222cb 100644 --- a/packages/core/scripts/generate-client.cjs +++ b/packages/core/scripts/generate-client.cjs @@ -464,8 +464,7 @@ async function main() { output.write( '/* eslint-disable ts/no-unsafe-declaration-merging, ts/no-unsafe-argument */\n' + '/* THIS FILE WAS AUTO-GENERATED */\n' - + '// eslint-disable-next-line unicorn/prefer-node-protocol\n' - + "import EventEmitter from 'events'\n" + + "import { Emitter } from '@fuman/utils'\n" + "import Long from 'long'\n", ) Object.entries(state.imports).forEach(([module, items]) => { @@ -483,36 +482,17 @@ async function main() { output.write('\nexport interface TelegramClient extends ITelegramClient {\n') - output.write(`/** - * Register a raw update handler - * - * @param name Event name - * @param handler Raw update handler - */ - on(name: 'raw_update', handler: ((upd: tl.TypeUpdate | tl.TypeMessage, peers: PeersIndex) => void)): this -/** - * Register a parsed update handler - * - * @param name Event name - * @param handler Raw update handler - */ - on(name: 'update', handler: ((upd: ParsedUpdate) => void)): this\n`) + output.write(`/** Raw update emitter */ +readonly onRawUpdate: Emitter +/** Parsed update emitter */ +readonly onUpdate: Emitter`) updates.types.forEach((type) => { - output.write(`/** - * Register ${updates.toSentence(type, 'inline')} - * - * @param name Event name - * @param handler ${updates.toSentence(type, 'full')} - */ -on(name: '${type.typeName}', handler: ((upd: ${type.updateType}) => void)): this\n`) + output.write(`/** ${updates.toSentence(type, 'inline')} */\n`) + output.write(`readonly on${type.handlerTypeName}: Emitter<${type.updateType}>\n`) }) - output.write(` -// eslint-disable-next-line ts/no-explicit-any -on(name: string, handler: (...args: any[]) => void): this\n - -/** + output.write(`/** * Wrap this client so that all RPC calls will use the specified parameters. * * @param params Parameters to use @@ -691,14 +671,31 @@ withParams(params: RpcCallOptions): this\n`) output.write('\nexport type { TelegramClientOptions }\n') output.write('\nexport * from "./base.js"\n') - output.write('\nexport class TelegramClient extends EventEmitter implements ITelegramClient {\n') + output.write('\nexport class TelegramClient implements ITelegramClient {\n') output.write(' _client: ITelegramClient\n') state.fields.forEach(({ code }) => output.write(`protected ${code}\n`)) output.write('constructor(opts: TelegramClientOptions) {\n') - output.write(' super()\n') + output.write(' ;(this as any).onRawUpdate = new Emitter()\n') + output.write(' ;(this as any).onUpdate = new Emitter()\n') + updates.types.forEach((type) => { + // we use declaration merging so we can't simply write into this because it thinks it's readonly and already has a value + output.write(` ;(this as any).on${type.handlerTypeName} = new Emitter()\n`) + }) state.init.forEach((code) => { + code = code.replace('// @generate-update-emitter', () => { + const lines = [ + ' switch (update.name) {', + ] + updates.types.forEach((type) => { + lines.push(` case '${type.typeName}':`) + lines.push(` this.on${type.handlerTypeName}.emit(update.data)`) + lines.push(' break') + }) + lines.push(' }') + return lines.join('\n') + }) output.write(`${code}\n`) }) output.write('}\n') @@ -734,8 +731,6 @@ withParams(params: RpcCallOptions): this\n`) 'getPrimaryDcId', 'computeSrpParams', 'computeNewPasswordHash', - 'onConnectionState', - 'getServerUpdateHandler', 'changePrimaryDc', 'getMtprotoMessageId', ].forEach((name) => { @@ -745,15 +740,6 @@ withParams(params: RpcCallOptions): this\n`) + '}\n', ) }) - // disabled methods - they are used internally and we don't want to expose them - // if the user *really* needs them, they can use `client._client` to access the underlying client - ;['onServerUpdate', 'onUpdate'].forEach((name) => { - output.write( - `TelegramClient.prototype.${name} = function() {\n` - + ` throw new Error('${name} is not available for TelegramClient, use .on() methods instead')\n` - + '}\n', - ) - }) state.impls.forEach(({ name, code }) => output.write(`TelegramClient.prototype.${name} = ${code}\n`)) // write methods re-exports to separate file diff --git a/packages/core/src/highlevel/base.ts b/packages/core/src/highlevel/base.ts index 9d5ebbc4..b4b8f62c 100644 --- a/packages/core/src/highlevel/base.ts +++ b/packages/core/src/highlevel/base.ts @@ -1,12 +1,12 @@ import type { mtp } from '@mtcute/tl' import { tl } from '@mtcute/tl' import type Long from 'long' +import { Emitter } from '@fuman/utils' import type { MtClientOptions } from '../network/client.js' import { MtClient } from '../network/client.js' import type { ConnectionKind, RpcCallOptions } from '../network/network-manager.js' import type { StorageManagerExtraOptions } from '../storage/storage.js' -import { MtArgumentError } from '../types/errors.js' import type { MustEqual } from '../types/utils.js' import { reportUnknownError } from '../utils/error-reporting.js' import type { @@ -25,13 +25,13 @@ import { import { LogManager } from '../utils/logger.js' import type { ICorePlatform } from '../types/platform' -import type { ConnectionState, ITelegramClient, ServerUpdateHandler } from './client.types.js' +import type { ConnectionState, ITelegramClient } from './client.types.js' import { AppConfigManager } from './managers/app-config-manager.js' import type { ITelegramStorageProvider } from './storage/provider.js' import type { TelegramStorageManagerExtraOptions } from './storage/storage.js' import { TelegramStorageManager } from './storage/storage.js' import { UpdatesManager } from './updates/manager.js' -import type { RawUpdateHandler, UpdatesManagerParams } from './updates/types.js' +import type { RawUpdateInfo, UpdatesManagerParams } from './updates/types.js' export interface BaseTelegramClientOptions extends MtClientOptions { storage: ITelegramStorageProvider @@ -51,8 +51,6 @@ function makeRpcError(raw: mtp.RawMt_rpc_error, stack: string, method?: string) export class BaseTelegramClient implements ITelegramClient { readonly updates?: UpdatesManager - private _serverUpdatesHandler: ServerUpdateHandler = () => {} - private _connectionStateHandler: (state: ConnectionState) => void = () => {} readonly log: Logger readonly mt: MtClient @@ -60,6 +58,10 @@ export class BaseTelegramClient implements ITelegramClient { readonly storage: TelegramStorageManager readonly platform: ICorePlatform + readonly onServerUpdate: Emitter = new Emitter() + readonly onRawUpdate: Emitter = new Emitter() + readonly onConnectionState: Emitter = new Emitter() + constructor(readonly params: BaseTelegramClientOptions) { this.log = this.params.logger ?? new LogManager('client', params.platform) this.platform = this.params.platform @@ -70,24 +72,18 @@ export class BaseTelegramClient implements ITelegramClient { if (!params.disableUpdates && params.updates !== false) { this.updates = new UpdatesManager(this, params.updates) - this._serverUpdatesHandler = this.updates.handleUpdate.bind(this.updates) + this.onServerUpdate.add(this.updates.handleUpdate.bind(this.updates)) this.updates.onCatchingUp((catchingUp) => { - this._connectionStateHandler(catchingUp ? 'updating' : 'connected') + this.onConnectionState.emit(catchingUp ? 'updating' : 'connected') }) } - this.mt.on('update', (update: tl.TypeUpdates) => { - this._serverUpdatesHandler(update) - }) - this.mt.on('usable', () => { - this._connectionStateHandler('connected') - }) - this.mt.on('wait', () => { - this._connectionStateHandler('connecting') - }) - this.mt.on('networkChanged', (connected: boolean) => { + this.mt.onUpdate.forwardTo(this.onServerUpdate) + this.mt.onUsable.add(() => this.onConnectionState.emit('connected')) + this.mt.onConnecting.add(() => this.onConnectionState.emit('connecting')) + this.mt.onNetworkChanged.add((connected: boolean) => { if (!connected) { - this._connectionStateHandler('offline') + this.onConnectionState.emit('offline') } }) @@ -311,26 +307,6 @@ export class BaseTelegramClient implements ITelegramClient { this.updates?.handleClientUpdate(updates, noDispatch) } - onServerUpdate(handler: ServerUpdateHandler): void { - this._serverUpdatesHandler = handler - } - - getServerUpdateHandler(): ServerUpdateHandler { - return this._serverUpdatesHandler - } - - onUpdate(handler: RawUpdateHandler): void { - if (!this.updates) { - throw new MtArgumentError('Updates manager is disabled') - } - - this.updates.setHandler(handler) - } - - onConnectionState(handler: (state: ConnectionState) => void): void { - this._connectionStateHandler = handler - } - async getApiCrenetials(): Promise<{ id: number hash: string diff --git a/packages/core/src/highlevel/client.ts b/packages/core/src/highlevel/client.ts index f683e224..49d46408 100644 --- a/packages/core/src/highlevel/client.ts +++ b/packages/core/src/highlevel/client.ts @@ -1,8 +1,6 @@ /* eslint-disable ts/no-unsafe-declaration-merging, ts/no-unsafe-argument */ /* THIS FILE WAS AUTO-GENERATED */ -// eslint-disable-next-line unicorn/prefer-node-protocol -import EventEmitter from 'events' - +import { Emitter } from '@fuman/utils' import type Long from 'long' import type { tdFileId } from '@mtcute/file-id' import type { tl } from '@mtcute/tl' @@ -14,7 +12,8 @@ import { MtUnsupportedError } from '../types/index.js' import type { BaseTelegramClientOptions } from './base.js' import { BaseTelegramClient } from './base.js' import type { ITelegramClient } from './client.types.js' -import type { AllStories, ArrayPaginated, ArrayWithTotal, Boost, BoostSlot, BoostStats, BotChatJoinRequestUpdate, BotCommands, BotReactionCountUpdate, BotReactionUpdate, BotStoppedUpdate, BusinessCallbackQuery, BusinessChatLink, BusinessConnection, BusinessMessage, BusinessWorkHoursDay, CallbackQuery, Chat, ChatEvent, ChatInviteLink, ChatInviteLinkMember, ChatJoinRequestUpdate, ChatMember, ChatMemberUpdate, ChatPreview, ChatlistPreview, ChosenInlineResult, CollectibleInfo, DeleteBusinessMessageUpdate, DeleteMessageUpdate, DeleteStoryUpdate, Dialog, FactCheck, FileDownloadLocation, FileDownloadParameters, ForumTopic, FullChat, GameHighScore, HistoryReadUpdate, InlineCallbackQuery, InlineQuery, InputChatEventFilters, InputDialogFolder, InputFileLike, InputInlineResult, InputMediaLike, InputMediaSticker, InputMessageId, InputPeerLike, InputPrivacyRule, InputReaction, InputStickerSet, InputStickerSetItem, InputText, MaybeDynamic, Message, MessageEffect, MessageMedia, MessageReactions, ParametersSkip2, ParsedUpdate, PeerReaction, PeerStories, PeersIndex, Photo, Poll, PollUpdate, PollVoteUpdate, PreCheckoutQuery, RawDocument, ReplyMarkup, SentCode, StarsStatus, StarsTransaction, Sticker, StickerSet, StickerSourceType, StickerType, StoriesStealthMode, Story, StoryInteractions, StoryUpdate, StoryViewer, StoryViewersList, TakeoutSession, TextWithEntities, TypingStatus, UploadFileLike, UploadedFile, User, UserStatusUpdate, UserTypingUpdate } from './types/index.js' +import type { RawUpdateInfo } from './updates/types.js' +import type { AllStories, ArrayPaginated, ArrayWithTotal, Boost, BoostSlot, BoostStats, BotChatJoinRequestUpdate, BotCommands, BotReactionCountUpdate, BotReactionUpdate, BotStoppedUpdate, BusinessCallbackQuery, BusinessChatLink, BusinessConnection, BusinessMessage, BusinessWorkHoursDay, CallbackQuery, Chat, ChatEvent, ChatInviteLink, ChatInviteLinkMember, ChatJoinRequestUpdate, ChatMember, ChatMemberUpdate, ChatPreview, ChatlistPreview, ChosenInlineResult, CollectibleInfo, DeleteBusinessMessageUpdate, DeleteMessageUpdate, DeleteStoryUpdate, Dialog, FactCheck, FileDownloadLocation, FileDownloadParameters, ForumTopic, FullChat, GameHighScore, HistoryReadUpdate, InlineCallbackQuery, InlineQuery, InputChatEventFilters, InputDialogFolder, InputFileLike, InputInlineResult, InputMediaLike, InputMediaSticker, InputMessageId, InputPeerLike, InputPrivacyRule, InputReaction, InputStickerSet, InputStickerSetItem, InputText, MaybeDynamic, Message, MessageEffect, MessageMedia, MessageReactions, ParametersSkip2, ParsedUpdate, PeerReaction, PeerStories, Photo, Poll, PollUpdate, PollVoteUpdate, PreCheckoutQuery, RawDocument, ReplyMarkup, SentCode, StarsStatus, StarsTransaction, Sticker, StickerSet, StickerSourceType, StickerType, StoriesStealthMode, Story, StoryInteractions, StoryUpdate, StoryViewer, StoryViewersList, TakeoutSession, TextWithEntities, TypingStatus, UploadFileLike, UploadedFile, User, UserStatusUpdate, UserTypingUpdate } from './types/index.js' import type { StringSessionData } from './utils/string-session.js' import type { ITelegramStorageProvider } from './storage/provider.js' import { Conversation } from './types/conversation.js' @@ -313,220 +312,65 @@ type TelegramClientOptions = ( } export interface TelegramClient extends ITelegramClient { -/** - * Register a raw update handler - * - * @param name Event name - * @param handler Raw update handler - */ - on(name: 'raw_update', handler: ((upd: tl.TypeUpdate | tl.TypeMessage, peers: PeersIndex) => void)): this - /** - * Register a parsed update handler - * - * @param name Event name - * @param handler Raw update handler - */ - on(name: 'update', handler: ((upd: ParsedUpdate) => void)): this - /** - * Register a new message handler - * - * @param name Event name - * @param handler New message handler - */ - on(name: 'new_message', handler: ((upd: Message) => void)): this - /** - * Register an edit message handler - * - * @param name Event name - * @param handler Edit message handler - */ - on(name: 'edit_message', handler: ((upd: Message) => void)): this - /** - * Register a message group handler - * - * @param name Event name - * @param handler Message group handler - */ - on(name: 'message_group', handler: ((upd: Message[]) => void)): this - /** - * Register a delete message handler - * - * @param name Event name - * @param handler Delete message handler - */ - on(name: 'delete_message', handler: ((upd: DeleteMessageUpdate) => void)): this - /** - * Register a chat member update handler - * - * @param name Event name - * @param handler Chat member update handler - */ - on(name: 'chat_member', handler: ((upd: ChatMemberUpdate) => void)): this - /** - * Register an inline query handler - * - * @param name Event name - * @param handler Inline query handler - */ - on(name: 'inline_query', handler: ((upd: InlineQuery) => void)): this - /** - * Register a chosen inline result handler - * - * @param name Event name - * @param handler Chosen inline result handler - */ - on(name: 'chosen_inline_result', handler: ((upd: ChosenInlineResult) => void)): this - /** - * Register a callback query handler - * - * @param name Event name - * @param handler Callback query handler - */ - on(name: 'callback_query', handler: ((upd: CallbackQuery) => void)): this - /** - * Register an inline callback query handler - * - * @param name Event name - * @param handler Inline callback query handler - */ - on(name: 'inline_callback_query', handler: ((upd: InlineCallbackQuery) => void)): this - /** - * Register a business callback query handler - * - * @param name Event name - * @param handler Business callback query handler - */ - on(name: 'business_callback_query', handler: ((upd: BusinessCallbackQuery) => void)): this - /** - * Register a poll update handler - * - * @param name Event name - * @param handler Poll update handler - */ - on(name: 'poll', handler: ((upd: PollUpdate) => void)): this - /** - * Register a poll vote handler - * - * @param name Event name - * @param handler Poll vote handler - */ - on(name: 'poll_vote', handler: ((upd: PollVoteUpdate) => void)): this - /** - * Register an user status update handler - * - * @param name Event name - * @param handler User status update handler - */ - on(name: 'user_status', handler: ((upd: UserStatusUpdate) => void)): this - /** - * Register an user typing handler - * - * @param name Event name - * @param handler User typing handler - */ - on(name: 'user_typing', handler: ((upd: UserTypingUpdate) => void)): this - /** - * Register a history read handler - * - * @param name Event name - * @param handler History read handler - */ - on(name: 'history_read', handler: ((upd: HistoryReadUpdate) => void)): this - /** - * Register a bot stopped handler - * - * @param name Event name - * @param handler Bot stopped handler - */ - on(name: 'bot_stopped', handler: ((upd: BotStoppedUpdate) => void)): this - /** - * Register a bot chat join request handler - * - * @param name Event name - * @param handler Bot chat join request handler - */ - on(name: 'bot_chat_join_request', handler: ((upd: BotChatJoinRequestUpdate) => void)): this - /** - * Register a chat join request handler - * - * @param name Event name - * @param handler Chat join request handler - */ - on(name: 'chat_join_request', handler: ((upd: ChatJoinRequestUpdate) => void)): this - /** - * Register a pre checkout query handler - * - * @param name Event name - * @param handler Pre checkout query handler - */ - on(name: 'pre_checkout_query', handler: ((upd: PreCheckoutQuery) => void)): this - /** - * Register a story update handler - * - * @param name Event name - * @param handler Story update handler - */ - on(name: 'story', handler: ((upd: StoryUpdate) => void)): this - /** - * Register a delete story handler - * - * @param name Event name - * @param handler Delete story handler - */ - on(name: 'delete_story', handler: ((upd: DeleteStoryUpdate) => void)): this - /** - * Register a bot reaction update handler - * - * @param name Event name - * @param handler Bot reaction update handler - */ - on(name: 'bot_reaction', handler: ((upd: BotReactionUpdate) => void)): this - /** - * Register a bot reaction count update handler - * - * @param name Event name - * @param handler Bot reaction count update handler - */ - on(name: 'bot_reaction_count', handler: ((upd: BotReactionCountUpdate) => void)): this - /** - * Register a business connection update handler - * - * @param name Event name - * @param handler Business connection update handler - */ - on(name: 'business_connection', handler: ((upd: BusinessConnection) => void)): this - /** - * Register a new business message handler - * - * @param name Event name - * @param handler New business message handler - */ - on(name: 'new_business_message', handler: ((upd: BusinessMessage) => void)): this - /** - * Register an edit business message handler - * - * @param name Event name - * @param handler Edit business message handler - */ - on(name: 'edit_business_message', handler: ((upd: BusinessMessage) => void)): this - /** - * Register a business message group handler - * - * @param name Event name - * @param handler Business message group handler - */ - on(name: 'business_message_group', handler: ((upd: BusinessMessage[]) => void)): this - /** - * Register a delete business message handler - * - * @param name Event name - * @param handler Delete business message handler - */ - on(name: 'delete_business_message', handler: ((upd: DeleteBusinessMessageUpdate) => void)): this - - // eslint-disable-next-line ts/no-explicit-any - on(name: string, handler: (...args: any[]) => void): this - +/** Raw update emitter */ + readonly onRawUpdate: Emitter + /** Parsed update emitter */ + readonly onUpdate: Emitter/** a new message handler */ + readonly onNewMessage: Emitter + /** an edit message handler */ + readonly onEditMessage: Emitter + /** a message group handler */ + readonly onMessageGroup: Emitter + /** a delete message handler */ + readonly onDeleteMessage: Emitter + /** a chat member update handler */ + readonly onChatMemberUpdate: Emitter + /** an inline query handler */ + readonly onInlineQuery: Emitter + /** a chosen inline result handler */ + readonly onChosenInlineResult: Emitter + /** a callback query handler */ + readonly onCallbackQuery: Emitter + /** an inline callback query handler */ + readonly onInlineCallbackQuery: Emitter + /** a business callback query handler */ + readonly onBusinessCallbackQuery: Emitter + /** a poll update handler */ + readonly onPollUpdate: Emitter + /** a poll vote handler */ + readonly onPollVote: Emitter + /** an user status update handler */ + readonly onUserStatusUpdate: Emitter + /** an user typing handler */ + readonly onUserTyping: Emitter + /** a history read handler */ + readonly onHistoryRead: Emitter + /** a bot stopped handler */ + readonly onBotStopped: Emitter + /** a bot chat join request handler */ + readonly onBotChatJoinRequest: Emitter + /** a chat join request handler */ + readonly onChatJoinRequest: Emitter + /** a pre checkout query handler */ + readonly onPreCheckoutQuery: Emitter + /** a story update handler */ + readonly onStoryUpdate: Emitter + /** a delete story handler */ + readonly onDeleteStory: Emitter + /** a bot reaction update handler */ + readonly onBotReactionUpdate: Emitter + /** a bot reaction count update handler */ + readonly onBotReactionCountUpdate: Emitter + /** a business connection update handler */ + readonly onBusinessConnectionUpdate: Emitter + /** a new business message handler */ + readonly onNewBusinessMessage: Emitter + /** an edit business message handler */ + readonly onEditBusinessMessage: Emitter + /** a business message group handler */ + readonly onBusinessMessageGroup: Emitter + /** a delete business message handler */ + readonly onDeleteBusinessMessage: Emitter /** * Wrap this client so that all RPC calls will use the specified parameters. * @@ -5750,48 +5594,156 @@ export type { TelegramClientOptions } export * from './base.js' -export class TelegramClient extends EventEmitter implements ITelegramClient { +export class TelegramClient implements ITelegramClient { _client: ITelegramClient constructor(opts: TelegramClientOptions) { - super() + ;(this as any).onRawUpdate = new Emitter() + ;(this as any).onUpdate = new Emitter() + ;(this as any).onNewMessage = new Emitter() + ;(this as any).onEditMessage = new Emitter() + ;(this as any).onMessageGroup = new Emitter() + ;(this as any).onDeleteMessage = new Emitter() + ;(this as any).onChatMemberUpdate = new Emitter() + ;(this as any).onInlineQuery = new Emitter() + ;(this as any).onChosenInlineResult = new Emitter() + ;(this as any).onCallbackQuery = new Emitter() + ;(this as any).onInlineCallbackQuery = new Emitter() + ;(this as any).onBusinessCallbackQuery = new Emitter() + ;(this as any).onPollUpdate = new Emitter() + ;(this as any).onPollVote = new Emitter() + ;(this as any).onUserStatusUpdate = new Emitter() + ;(this as any).onUserTyping = new Emitter() + ;(this as any).onHistoryRead = new Emitter() + ;(this as any).onBotStopped = new Emitter() + ;(this as any).onBotChatJoinRequest = new Emitter() + ;(this as any).onChatJoinRequest = new Emitter() + ;(this as any).onPreCheckoutQuery = new Emitter() + ;(this as any).onStoryUpdate = new Emitter() + ;(this as any).onDeleteStory = new Emitter() + ;(this as any).onBotReactionUpdate = new Emitter() + ;(this as any).onBotReactionCountUpdate = new Emitter() + ;(this as any).onBusinessConnectionUpdate = new Emitter() + ;(this as any).onNewBusinessMessage = new Emitter() + ;(this as any).onEditBusinessMessage = new Emitter() + ;(this as any).onBusinessMessageGroup = new Emitter() + ;(this as any).onDeleteBusinessMessage = new Emitter() if ('client' in opts) { this._client = opts.client } else { - if (!opts.storage || typeof opts.storage === 'string' || !opts.transport || !opts.crypto) { + if (!opts.storage || typeof opts.storage === 'string' || !opts.transport || !opts.crypto || !opts.platform) { throw new MtUnsupportedError( - 'You need to explicitly provide storage, transport and crypto for @mtcute/core', + 'You need to explicitly provide storage, transport, crypto and platform for @mtcute/core', ) } this._client = new BaseTelegramClient(opts as BaseTelegramClientOptions) } - // @ts-expect-error codegen - this.log = this._client.log - // @ts-expect-error codegen - this.storage = this._client.storage - Object.defineProperty(this, 'stopSignal', { - get: () => this._client.stopSignal, - }) - Object.defineProperty(this, 'appConfig', { - get: () => this._client.appConfig, - }) + Object.defineProperty(this, 'log', { value: this._client.log }) + Object.defineProperty(this, 'storage', { value: this._client.storage }) + Object.defineProperty(this, 'stopSignal', { value: this._client.stopSignal }) + Object.defineProperty(this, 'appConfig', { value: this._client.appConfig }) + Object.defineProperty(this, 'onServerUpdate', { value: this._client.onServerUpdate }) + Object.defineProperty(this, 'onRawUpdate', { value: this._client.onServerUpdate }) + Object.defineProperty(this, 'onConnectionState', { value: this._client.onConnectionState }) if (!opts.disableUpdates) { const skipConversationUpdates = opts.skipConversationUpdates ?? true const { messageGroupingInterval } = opts.updates ?? {} - this._client.onUpdate( + this._client.onRawUpdate.add( makeParsedUpdateHandler({ messageGroupingInterval, onUpdate: (update) => { if (Conversation.handleUpdate(this, update) && skipConversationUpdates) return - this.emit('update', update) - this.emit(update.name, update.data) - }, - onRawUpdate: (update, peers) => { - this.emit('raw_update', update, peers) + this.onUpdate.emit(update) + switch (update.name) { + case 'new_message': + this.onNewMessage.emit(update.data) + break + case 'edit_message': + this.onEditMessage.emit(update.data) + break + case 'message_group': + this.onMessageGroup.emit(update.data) + break + case 'delete_message': + this.onDeleteMessage.emit(update.data) + break + case 'chat_member': + this.onChatMemberUpdate.emit(update.data) + break + case 'inline_query': + this.onInlineQuery.emit(update.data) + break + case 'chosen_inline_result': + this.onChosenInlineResult.emit(update.data) + break + case 'callback_query': + this.onCallbackQuery.emit(update.data) + break + case 'inline_callback_query': + this.onInlineCallbackQuery.emit(update.data) + break + case 'business_callback_query': + this.onBusinessCallbackQuery.emit(update.data) + break + case 'poll': + this.onPollUpdate.emit(update.data) + break + case 'poll_vote': + this.onPollVote.emit(update.data) + break + case 'user_status': + this.onUserStatusUpdate.emit(update.data) + break + case 'user_typing': + this.onUserTyping.emit(update.data) + break + case 'history_read': + this.onHistoryRead.emit(update.data) + break + case 'bot_stopped': + this.onBotStopped.emit(update.data) + break + case 'bot_chat_join_request': + this.onBotChatJoinRequest.emit(update.data) + break + case 'chat_join_request': + this.onChatJoinRequest.emit(update.data) + break + case 'pre_checkout_query': + this.onPreCheckoutQuery.emit(update.data) + break + case 'story': + this.onStoryUpdate.emit(update.data) + break + case 'delete_story': + this.onDeleteStory.emit(update.data) + break + case 'bot_reaction': + this.onBotReactionUpdate.emit(update.data) + break + case 'bot_reaction_count': + this.onBotReactionCountUpdate.emit(update.data) + break + case 'business_connection': + this.onBusinessConnectionUpdate.emit(update.data) + break + case 'new_business_message': + this.onNewBusinessMessage.emit(update.data) + break + case 'edit_business_message': + this.onEditBusinessMessage.emit(update.data) + break + case 'business_message_group': + this.onBusinessMessageGroup.emit(update.data) + break + case 'delete_business_message': + this.onDeleteBusinessMessage.emit(update.data) + break + } }, }), ) @@ -6656,21 +6608,9 @@ TelegramClient.prototype.computeSrpParams = function (...args) { TelegramClient.prototype.computeNewPasswordHash = function (...args) { return this._client.computeNewPasswordHash(...args) } -TelegramClient.prototype.onConnectionState = function (...args) { - return this._client.onConnectionState(...args) -} -TelegramClient.prototype.getServerUpdateHandler = function (...args) { - return this._client.getServerUpdateHandler(...args) -} TelegramClient.prototype.changePrimaryDc = function (...args) { return this._client.changePrimaryDc(...args) } TelegramClient.prototype.getMtprotoMessageId = function (...args) { return this._client.getMtprotoMessageId(...args) } -TelegramClient.prototype.onServerUpdate = function () { - throw new Error('onServerUpdate is not available for TelegramClient, use .on() methods instead') -} -TelegramClient.prototype.onUpdate = function () { - throw new Error('onUpdate is not available for TelegramClient, use .on() methods instead') -} diff --git a/packages/core/src/highlevel/client.types.ts b/packages/core/src/highlevel/client.types.ts index 70c7b9a0..cd014f5a 100644 --- a/packages/core/src/highlevel/client.types.ts +++ b/packages/core/src/highlevel/client.types.ts @@ -1,5 +1,6 @@ import type { tl } from '@mtcute/tl' import type Long from 'long' +import type { Emitter } from '@fuman/utils' import type { ConnectionKind, RpcCallOptions } from '../network/index.js' import type { MustEqual, PublicPart } from '../types/utils.js' @@ -8,8 +9,8 @@ import type { ICorePlatform } from '../types/platform' import type { AppConfigManager } from './managers/app-config-manager.js' import type { TelegramStorageManager } from './storage/storage.js' -import type { RawUpdateHandler } from './updates/types.js' import type { StringSessionData } from './utils/string-session.js' +import type { RawUpdateInfo } from './updates/types.js' /** * Connection state of the client @@ -25,8 +26,6 @@ import type { StringSessionData } from './utils/string-session.js' */ export type ConnectionState = 'offline' | 'connecting' | 'updating' | 'connected' -export type ServerUpdateHandler = (update: tl.TypeUpdates) => void - // NB: when adding new methods, don't forget to add them to: // - worker/port.ts // - generate-client script @@ -57,10 +56,9 @@ export interface ITelegramClient { emitError(err: unknown): void handleClientUpdate(updates: tl.TypeUpdates, noDispatch?: boolean): void - onServerUpdate(handler: ServerUpdateHandler): void - getServerUpdateHandler(): ServerUpdateHandler - onUpdate(handler: RawUpdateHandler): void - onConnectionState(handler: (state: ConnectionState) => void): void + onServerUpdate: Emitter + onRawUpdate: Emitter + onConnectionState: Emitter getApiCrenetials(): Promise<{ id: number, hash: string }> // todo - this is only used for file dl/ul, which should probably be moved diff --git a/packages/core/src/highlevel/methods/_imports.ts b/packages/core/src/highlevel/methods/_imports.ts index c5ef2ec9..dd35827a 100644 --- a/packages/core/src/highlevel/methods/_imports.ts +++ b/packages/core/src/highlevel/methods/_imports.ts @@ -14,6 +14,8 @@ import { BaseTelegramClient, BaseTelegramClientOptions } from '../base.js' // @copy import { ITelegramClient } from '../client.types.js' // @copy +import { RawUpdateInfo } from '../updates/types.js' +// @copy import { AllStories, ArrayPaginated, diff --git a/packages/core/src/highlevel/methods/_init.ts b/packages/core/src/highlevel/methods/_init.ts index cc192351..a9c1b44a 100644 --- a/packages/core/src/highlevel/methods/_init.ts +++ b/packages/core/src/highlevel/methods/_init.ts @@ -12,7 +12,6 @@ import { Conversation } from '../types/conversation.js' import type { ParsedUpdateHandlerParams } from '../updates/parsed.js' // @copy import { makeParsedUpdateHandler } from '../updates/parsed.js' - // @copy type TelegramClientOptions = ( | (PartialOnly, 'transport' | 'crypto' | 'platform'> & { @@ -52,41 +51,35 @@ function _initializeClient(this: TelegramClient, opts: TelegramClientOptions) { if ('client' in opts) { this._client = opts.client } else { - if (!opts.storage || typeof opts.storage === 'string' || !opts.transport || !opts.crypto) { + if (!opts.storage || typeof opts.storage === 'string' || !opts.transport || !opts.crypto || !opts.platform) { throw new MtUnsupportedError( - 'You need to explicitly provide storage, transport and crypto for @mtcute/core', + 'You need to explicitly provide storage, transport, crypto and platform for @mtcute/core', ) } this._client = new BaseTelegramClient(opts as BaseTelegramClientOptions) } - // @ts-expect-error codegen - this.log = this._client.log - // @ts-expect-error codegen - this.storage = this._client.storage - Object.defineProperty(this, 'stopSignal', { - get: () => this._client.stopSignal, - }) - Object.defineProperty(this, 'appConfig', { - get: () => this._client.appConfig, - }) + Object.defineProperty(this, 'log', { value: this._client.log }) + Object.defineProperty(this, 'storage', { value: this._client.storage }) + Object.defineProperty(this, 'stopSignal', { value: this._client.stopSignal }) + Object.defineProperty(this, 'appConfig', { value: this._client.appConfig }) + Object.defineProperty(this, 'onServerUpdate', { value: this._client.onServerUpdate }) + Object.defineProperty(this, 'onRawUpdate', { value: this._client.onServerUpdate }) + Object.defineProperty(this, 'onConnectionState', { value: this._client.onConnectionState }) if (!opts.disableUpdates) { const skipConversationUpdates = opts.skipConversationUpdates ?? true const { messageGroupingInterval } = opts.updates ?? {} - this._client.onUpdate( + this._client.onRawUpdate.add( makeParsedUpdateHandler({ messageGroupingInterval, onUpdate: (update) => { if (Conversation.handleUpdate(this, update) && skipConversationUpdates) return - this.emit('update', update) - this.emit(update.name, update.data) - }, - onRawUpdate: (update, peers) => { - this.emit('raw_update', update, peers) + this.onUpdate.emit(update) + // @generate-update-emitter }, }), ) diff --git a/packages/core/src/highlevel/types/updates/parse-update.ts b/packages/core/src/highlevel/types/updates/parse-update.ts index 5779211f..bb57ab02 100644 --- a/packages/core/src/highlevel/types/updates/parse-update.ts +++ b/packages/core/src/highlevel/types/updates/parse-update.ts @@ -1,8 +1,5 @@ -import type { tl } from '@mtcute/tl' - import type { ParsedUpdate, - PeersIndex, } from '../index.js' import { BotChatJoinRequestUpdate, @@ -30,9 +27,10 @@ import { UserStatusUpdate, UserTypingUpdate, } from '../index.js' +import type { RawUpdateInfo } from '../../updates/types.js' /** @internal */ -export function _parseUpdate(update: tl.TypeUpdate, peers: PeersIndex): ParsedUpdate | null { +export function _parseUpdate({ update, peers }: RawUpdateInfo): ParsedUpdate | null { switch (update._) { case 'updateNewMessage': case 'updateNewChannelMessage': diff --git a/packages/core/src/highlevel/updates/manager.ts b/packages/core/src/highlevel/updates/manager.ts index 02fc7e5e..e73b46db 100644 --- a/packages/core/src/highlevel/updates/manager.ts +++ b/packages/core/src/highlevel/updates/manager.ts @@ -22,7 +22,7 @@ import type { CurrentUserInfo } from '../storage/service/current-user.js' import { PeersIndex } from '../types/peers/peers-index.js' import { _getChannelsBatched } from '../methods/chats/batched-queries.js' -import type { PendingUpdate, PendingUpdateContainer, RawUpdateHandler, UpdatesManagerParams } from './types.js' +import { type PendingUpdate, type PendingUpdateContainer, RawUpdateInfo, type UpdatesManagerParams } from './types.js' import { createDummyUpdatesContainer, extractChannelIdFromUpdate, @@ -146,7 +146,6 @@ export class UpdatesManager { channelsOpened: Map = new Map() log: Logger - private _handler: RawUpdateHandler = () => {} private _onCatchingUp: (catchingUp: boolean) => void = () => {} @@ -186,14 +185,6 @@ export class UpdatesManager { } } - setHandler(handler: RawUpdateHandler): void { - this._handler = handler - } - - getHandler(): RawUpdateHandler { - return this._handler - } - onCatchingUp(handler: (catchingUp: boolean) => void): void { this._onCatchingUp = handler } @@ -1352,7 +1343,7 @@ export class UpdatesManager { } log.debug('dispatching %s (postponed = %s)', upd._, postponed) - this._handler(upd, pending.peers) + client.onRawUpdate.emit(new RawUpdateInfo(upd, pending.peers)) } async _loop(): Promise { diff --git a/packages/core/src/highlevel/updates/parsed.ts b/packages/core/src/highlevel/updates/parsed.ts index 33c7bda7..2544ccc9 100644 --- a/packages/core/src/highlevel/updates/parsed.ts +++ b/packages/core/src/highlevel/updates/parsed.ts @@ -4,7 +4,7 @@ import type { Message } from '../types/messages/index.js' import type { BusinessMessage, ParsedUpdate } from '../types/updates/index.js' import { _parseUpdate } from '../types/updates/parse-update.js' -import type { RawUpdateHandler } from './types.js' +import type { RawUpdateInfo } from './types.js' export interface ParsedUpdateHandlerParams { /** @@ -29,32 +29,23 @@ export interface ParsedUpdateHandlerParams { /** Handler for parsed updates */ onUpdate: (update: ParsedUpdate) => void - /** - * Handler for raw updates. - * - * Note that this handler will be called **before** the parsed update handler. - */ - onRawUpdate?: RawUpdateHandler } -export function makeParsedUpdateHandler(params: ParsedUpdateHandlerParams): RawUpdateHandler { - const { messageGroupingInterval, onUpdate, onRawUpdate = () => {} } = params +export function makeParsedUpdateHandler(params: ParsedUpdateHandlerParams): (update: RawUpdateInfo) => void { + const { messageGroupingInterval, onUpdate } = params if (!messageGroupingInterval) { - return (update, peers) => { - const parsed = _parseUpdate(update, peers) + return (info) => { + const parsed = _parseUpdate(info) - onRawUpdate(update, peers) if (parsed) onUpdate(parsed) } } const pending = new Map() - return (update, peers) => { - const parsed = _parseUpdate(update, peers) - - onRawUpdate(update, peers) + return (info) => { + const parsed = _parseUpdate(info) if (parsed) { if (parsed.name === 'new_message' || parsed.name === 'new_business_message') { diff --git a/packages/core/src/highlevel/updates/types.ts b/packages/core/src/highlevel/updates/types.ts index 320265e4..987e8839 100644 --- a/packages/core/src/highlevel/updates/types.ts +++ b/packages/core/src/highlevel/updates/types.ts @@ -5,14 +5,15 @@ import type { EarlyTimer, Logger, SortedLinkedList } from '../../utils/index.js' import type { CurrentUserInfo } from '../storage/service/current-user.js' import type { PeersIndex } from '../types/peers/peers-index.js' -/** - * Function to be called for each update. - * - * @param upd The update - * @param peers Peers that are present in the update - */ -export type RawUpdateHandler = (upd: tl.TypeUpdate, peers: PeersIndex) => void - +/** Information about a raw update. */ +export class RawUpdateInfo { + constructor( + /** The update */ + readonly update: tl.TypeUpdate, + /** Peers that are present in the update */ + readonly peers: PeersIndex, + ) {} +} /** * Parameters for the updates manager */ @@ -167,6 +168,5 @@ export interface UpdatesState { log: Logger stop: () => void - handler: RawUpdateHandler auth: CurrentUserInfo | null } diff --git a/packages/core/src/network/client.ts b/packages/core/src/network/client.ts index 928e5c1c..1d90762e 100644 --- a/packages/core/src/network/client.ts +++ b/packages/core/src/network/client.ts @@ -1,12 +1,10 @@ -// eslint-disable-next-line unicorn/prefer-node-protocol -import EventEmitter from 'events' - import type { mtp } from '@mtcute/tl' import { tl } from '@mtcute/tl' import { __tlReaderMap as defaultReaderMap } from '@mtcute/tl/binary/reader.js' import { __tlWriterMap as defaultWriterMap } from '@mtcute/tl/binary/writer.js' import type { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' import type { ReconnectionStrategy } from '@fuman/net' +import { Emitter } from '@fuman/utils' import type { IMtStorageProvider } from '../storage/provider.js' import type { StorageManagerExtraOptions } from '../storage/storage.js' @@ -180,7 +178,7 @@ export interface MtClientOptions { * to make RPC calls and receive low-level updates, as well as providing * some APIs to manage that. */ -export class MtClient extends EventEmitter { +export class MtClient { /** * Crypto provider taken from {@link MtClientOptions.crypto} */ @@ -223,9 +221,12 @@ export class MtClient extends EventEmitter { private _abortController: AbortController readonly stopSignal: AbortSignal - constructor(readonly params: MtClientOptions) { - super() + readonly onUsable: Emitter = new Emitter() + readonly onConnecting: Emitter = new Emitter() + readonly onNetworkChanged: Emitter = new Emitter() + readonly onUpdate: Emitter = new Emitter() + constructor(readonly params: MtClientOptions) { this.log = params.logger ?? new LogManager(undefined, params.platform) if (params.logLevel !== undefined) { @@ -281,10 +282,10 @@ export class MtClient extends EventEmitter { isPremium: false, useIpv6: Boolean(params.useIpv6), enableErrorReporting: params.enableErrorReporting ?? false, - onUsable: () => this.emit('usable'), - onConnecting: () => this.emit('connecting'), - onNetworkChanged: connected => this.emit('networkChanged', connected), - onUpdate: upd => this.emit('update', upd), + onUsable: this.onUsable.emit.bind(this.onUsable), + onConnecting: this.onConnecting.emit.bind(this.onConnecting), + onNetworkChanged: this.onNetworkChanged.emit.bind(this.onNetworkChanged), + onUpdate: this.onUpdate.emit.bind(this.onUpdate), stopSignal: this.stopSignal, platform: params.platform, ...params.network, diff --git a/packages/core/src/network/multi-session-connection.ts b/packages/core/src/network/multi-session-connection.ts index a98da6f4..b3f820f7 100644 --- a/packages/core/src/network/multi-session-connection.ts +++ b/packages/core/src/network/multi-session-connection.ts @@ -1,8 +1,5 @@ -// eslint-disable-next-line unicorn/prefer-node-protocol -import EventEmitter from 'events' - import type { mtp, tl } from '@mtcute/tl' -import { Deferred } from '@fuman/utils' +import { Deferred, Emitter } from '@fuman/utils' import type { Logger } from '../utils/index.js' @@ -11,18 +8,29 @@ import type { SessionConnectionParams } from './session-connection.js' import { SessionConnection } from './session-connection.js' import type { TelegramTransport } from './transports' -export class MultiSessionConnection extends EventEmitter { +export class MultiSessionConnection { private _log: Logger readonly _sessions: MtprotoSession[] private _enforcePfs = false + // NB: dont forget to update .reset() + readonly onRequestKeys: Emitter> = new Emitter() + readonly onError: Emitter = new Emitter() + readonly onUpdate: Emitter = new Emitter() + readonly onKeyChange: Emitter<[number, Uint8Array | null]> = new Emitter() + readonly onTmpKeyChange: Emitter<[number, Uint8Array | null, number]> = new Emitter() + readonly onFutureSalts: Emitter = new Emitter() + readonly onAuthBegin: Emitter = new Emitter() + readonly onUsable: Emitter = new Emitter() + readonly onWait: Emitter = new Emitter() + readonly onRequestAuth: Emitter = new Emitter() + constructor( readonly params: SessionConnectionParams, private _count: number, log: Logger, logPrefix = '', ) { - super() this._log = log.create('multi') if (logPrefix) this._log.prefix = `[${logPrefix}] ` this._enforcePfs = _count > 1 && params.isMainConnection @@ -119,7 +127,6 @@ export class MultiSessionConnection extends EventEmitter { if (this._connections.length > this._count) { // destroy extra connections for (let i = this._connections.length - 1; i >= this._count; i--) { - this._connections[i].removeAllListeners() this._connections[i].destroy().catch((err) => { this._log.warn('error destroying connection: %e', err) }) @@ -133,7 +140,7 @@ export class MultiSessionConnection extends EventEmitter { if (enforcePfsChanged) { // we need to fetch new auth keys first const promise = new Deferred() - this.emit('request-keys', promise) + this.onRequestKeys.emit(promise) promise.promise .then(() => { @@ -144,7 +151,7 @@ export class MultiSessionConnection extends EventEmitter { }) }) .catch((err) => { - this.emit('error', err) + this.onError.emit(err) }) } @@ -164,11 +171,11 @@ export class MultiSessionConnection extends EventEmitter { ) if (this.params.isMainConnection && this.params.isMainDcConnection) { - conn.on('update', update => this.emit('update', update)) + conn.onUpdate.add(update => this.onUpdate.emit(update)) } - conn.on('error', err => this.emit('error', err, conn)) - conn.on('key-change', (key) => { - this.emit('key-change', i, key) + conn.onError.add(err => this.onError.emit(err)) + conn.onKeyChange.add((key) => { + this.onKeyChange.emit([i, key]) // notify other connections for (const conn_ of this._connections) { @@ -176,11 +183,13 @@ export class MultiSessionConnection extends EventEmitter { conn_.onConnected() } }) - conn.on('tmp-key-change', (key, expires) => this.emit('tmp-key-change', i, key, expires)) - conn.on('future-salts', salts => this.emit('future-salts', salts)) - conn.on('auth-begin', () => { + conn.onTmpKeyChange.add( + event => this.onTmpKeyChange.emit(event === null ? [i, null, 0] : [i, event[0], event[1]]), + ) + conn.onFutureSalts.add(salts => this.onFutureSalts.emit(salts)) + conn.onAuthBegin.add(() => { this._log.debug('received auth-begin from connection %d', i) - this.emit('auth-begin', i) + this.onAuthBegin.emit(i) // we need to reset temp auth keys if there are any left @@ -189,10 +198,10 @@ export class MultiSessionConnection extends EventEmitter { if (conn_ !== conn) conn_.reconnect() }) }) - conn.on('usable', () => this.emit('usable', i)) - conn.on('wait', () => this.emit('wait', i)) - conn.on('request-auth', () => this.emit('request-auth', i)) - conn.on('flood-done', () => { + conn.onUsable.add(() => this.onUsable.emit(i)) + conn.onWait.add(() => this.onWait.emit(i)) + conn.onRequestAuth.add(() => this.onRequestAuth.emit(i)) + conn.onFloodDone.add(() => { this._log.debug('received flood-done from connection %d', i) this._connections.forEach(it => it.flushWhenIdle()) @@ -208,7 +217,17 @@ export class MultiSessionConnection extends EventEmitter { async destroy(): Promise { await Promise.all(this._connections.map(conn => conn.destroy())) this._sessions.forEach(sess => sess.reset()) - this.removeAllListeners() + + this.onRequestKeys.clear() + this.onError.clear() + this.onUpdate.clear() + this.onKeyChange.clear() + this.onTmpKeyChange.clear() + this.onFutureSalts.clear() + this.onAuthBegin.clear() + this.onUsable.clear() + this.onWait.clear() + this.onRequestAuth.clear() this._destroyed = true } diff --git a/packages/core/src/network/network-manager.ts b/packages/core/src/network/network-manager.ts index ce839fb9..b829be97 100644 --- a/packages/core/src/network/network-manager.ts +++ b/packages/core/src/network/network-manager.ts @@ -16,7 +16,7 @@ import type { ConfigManager } from './config-manager.js' import { basic as defaultMiddlewares } from './middlewares/default.js' import { MultiSessionConnection } from './multi-session-connection.js' import { ServerSaltManager } from './server-salt.js' -import type { SessionConnection, SessionConnectionParams } from './session-connection.js' +import type { SessionConnectionParams } from './session-connection.js' import type { TelegramTransport } from './transports/abstract.js' export type ConnectionKind = 'main' | 'upload' | 'download' | 'downloadSmall' @@ -43,7 +43,7 @@ export interface NetworkManagerParams { readerMap: TlReaderMap writerMap: TlWriterMap isPremium: boolean - emitError: (err: Error, connection?: SessionConnection) => void + emitError: (err: Error) => void onUpdate: (upd: tl.TypeUpdates) => void onUsable: () => void onConnecting: () => void @@ -314,7 +314,7 @@ export class DcConnectionManager { private _setupMulti(kind: ConnectionKind): void { const connection = this[kind] - connection.on('key-change', (idx, key: Uint8Array | null) => { + connection.onKeyChange.add(([idx, key]) => { if (kind !== 'main') { // main connection is responsible for authorization, // and keys are then sent to other connections @@ -340,7 +340,7 @@ export class DcConnectionManager { this.manager.params.emitError(e) }) }) - connection.on('tmp-key-change', (idx: number, key: Uint8Array | null, expires: number) => { + connection.onTmpKeyChange.add(([idx, key, expires]) => { if (kind !== 'main') { this.manager._log.warn('got tmp-key-change from non-main connection, ignoring') @@ -365,13 +365,13 @@ export class DcConnectionManager { this.manager.params.emitError(e) }) }) - connection.on('future-salts', (salts: mtp.RawMt_future_salt[]) => { + connection.onFutureSalts.add((salts: mtp.RawMt_future_salt[]) => { Promise.resolve(this.manager._storage.salts.store(this.dcId, salts)).catch((e: Error) => this.manager.params.emitError(e), ) }) - connection.on('auth-begin', () => { + connection.onAuthBegin.add(() => { // we need to propagate auth-begin to all connections // to avoid them sending requests before auth is complete if (kind !== 'main') { @@ -387,19 +387,19 @@ export class DcConnectionManager { this.downloadSmall.resetAuthKeys() }) - connection.on('request-auth', () => { + connection.onRequestAuth.add(() => { this.main.requestAuth() }) // fucking awesome architecture, but whatever - connection.on('request-keys', (promise: Deferred) => { + connection.onRequestKeys.add((promise: Deferred) => { this.loadKeys(true) .then(() => promise.resolve()) .catch((e: Error) => promise.reject(e)) }) - connection.on('error', (err: Error, conn: SessionConnection) => { - this.manager.params.emitError(err, conn) + connection.onError.add((err: Error) => { + this.manager.params.emitError(err) }) } @@ -562,15 +562,15 @@ export class NetworkManager { this.params.onConnecting() - dc.main.on('usable', () => { + dc.main.onUsable.add(() => { if (dc !== this._primaryDc) return this.params.onUsable() }) - dc.main.on('wait', () => { + dc.main.onWait.add(() => { if (dc !== this._primaryDc) return this.params.onConnecting() }) - dc.main.on('update', (update: tl.TypeUpdates) => { + dc.main.onUpdate.add((update: tl.TypeUpdates) => { this._updateHandler(update, false) }) diff --git a/packages/core/src/network/persistent-connection.ts b/packages/core/src/network/persistent-connection.ts index e50c7864..80c9c5fe 100644 --- a/packages/core/src/network/persistent-connection.ts +++ b/packages/core/src/network/persistent-connection.ts @@ -1,10 +1,7 @@ -// eslint-disable-next-line unicorn/prefer-node-protocol -import EventEmitter from 'events' - import type { ReconnectionStrategy } from '@fuman/net' import { PersistentConnection as FumanPersistentConnection } from '@fuman/net' import { FramedReader, FramedWriter } from '@fuman/io' -import { timers } from '@fuman/utils' +import { Emitter, timers } from '@fuman/utils' import type { BasicDcOption, ICryptoProvider, Logger } from '../utils/index.js' @@ -26,7 +23,7 @@ let nextConnectionUid = 0 * Only used for {@link PersistentConnection} and used as a mean of code splitting. * This class doesn't know anything about MTProto, it just manages the transport. */ -export abstract class PersistentConnection extends EventEmitter { +export abstract class PersistentConnection { private _uid = nextConnectionUid++ readonly params: PersistentConnectionParams @@ -45,10 +42,13 @@ export abstract class PersistentConnection extends EventEmitter { _destroyed = false _usable = false + readonly onWait: Emitter = new Emitter() + readonly onUsable: Emitter = new Emitter() + readonly onError: Emitter = new Emitter() + protected abstract onConnected(): void protected abstract onClosed(): void - - protected abstract onError(err: Error): void + protected abstract handleError(err: Error): void protected abstract onMessage(data: Uint8Array): void @@ -56,7 +56,6 @@ export abstract class PersistentConnection extends EventEmitter { params: PersistentConnectionParams, readonly log: Logger, ) { - super() this.params = params this.params.transport.setup?.(this.params.crypto, log) @@ -76,7 +75,7 @@ export abstract class PersistentConnection extends EventEmitter { onWait: (wait) => { this._updateLogPrefix() this.log.debug('waiting for %d ms before reconnecting', wait) - this.emit('wait', wait) + this.onWait.emit(wait) }, }) @@ -125,7 +124,7 @@ export abstract class PersistentConnection extends EventEmitter { } this._rescheduleInactivity() - this.emit('usable') // is this needed? + this.onUsable.emit() // is this needed? this.onConnected() while (true) { @@ -148,7 +147,7 @@ export abstract class PersistentConnection extends EventEmitter { private async _onError(err: Error) { this._updateLogPrefix() - this.onError(err) + this.handleError(err) return 'reconnect' as const } diff --git a/packages/core/src/network/session-connection.ts b/packages/core/src/network/session-connection.ts index 00f015f1..e0a1c7dd 100644 --- a/packages/core/src/network/session-connection.ts +++ b/packages/core/src/network/session-connection.ts @@ -3,7 +3,7 @@ import type { mtp } from '@mtcute/tl' import { tl } from '@mtcute/tl' import type { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' import { TlBinaryReader, TlBinaryWriter, TlSerializationCounter } from '@mtcute/tl-runtime' -import { Deferred, u8 } from '@fuman/utils' +import { Deferred, Emitter, u8 } from '@fuman/utils' import { MtArgumentError, MtTimeoutError, MtcuteError } from '../types/index.js' import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js' @@ -88,6 +88,16 @@ export class SessionConnection extends PersistentConnection { // https://github.com/tdlib/td/blob/91aa6c9e4d0774eabf4f8d7f3aa51239032059a6/td/mtproto/SessionConnection.h private _pingInterval: number + // NB: dont forget to update .reset() + readonly onDisconnect: Emitter = new Emitter() + readonly onKeyChange: Emitter = new Emitter() + readonly onTmpKeyChange: Emitter<[Uint8Array, number] | null> = new Emitter() + readonly onFloodDone: Emitter = new Emitter() + readonly onRequestAuth: Emitter = new Emitter() + readonly onAuthBegin: Emitter = new Emitter() + readonly onUpdate: Emitter = new Emitter() + readonly onFutureSalts: Emitter = new Emitter() + constructor( params: SessionConnectionParams, readonly _session: MtprotoSession, @@ -148,7 +158,7 @@ export class SessionConnection extends PersistentConnection { } } - this.emit('disconnect') + this.onDisconnect.emit() this.reset() } @@ -165,8 +175,18 @@ export class SessionConnection extends PersistentConnection { if (forever) { timers.clearTimeout(this._pfsUpdateTimeout) - this.removeAllListeners() - this.on('error', (err) => { + this.onDisconnect.clear() + this.onKeyChange.clear() + this.onTmpKeyChange.clear() + this.onFloodDone.clear() + this.onRequestAuth.clear() + this.onAuthBegin.clear() + this.onUpdate.clear() + this.onFutureSalts.clear() + this.onWait.clear() + this.onUsable.clear() + this.onError.clear() + this.onError.add((err) => { this.log.warn('caught error after destroying: %s', err) }) } @@ -202,7 +222,7 @@ export class SessionConnection extends PersistentConnection { this.onConnectionUsable() } - protected onError(error: Error): void { + protected handleError(error: Error): void { // https://core.telegram.org/mtproto/mtproto-_transports#_transport-errors if (error instanceof TransportError) { if (error.code === 404) { @@ -241,8 +261,8 @@ export class SessionConnection extends PersistentConnection { this.log.info('transport error 404, reauthorizing') this._session.resetAuthKey() this._resetSession() - this.emit('key-change', null) - this.emit('error', error) + this.onKeyChange.emit(null) + this.onError.emit(error) return } @@ -252,13 +272,13 @@ export class SessionConnection extends PersistentConnection { this._onAllFailed(`transport error ${error.code}`) if (error.code === 429) { - this._session.onTransportFlood(this.emit.bind(this, 'flood-done')) + this._session.onTransportFlood(() => this.onFloodDone.emit()) return } } - this.emit('error', error) + this.onError.emit(error) } protected onConnectionUsable(): void { @@ -285,13 +305,13 @@ export class SessionConnection extends PersistentConnection { if (!this.params.isMainConnection) { // we don't authorize on non-main connections this.log.debug('_authorize(): non-main connection, requesting...') - this.emit('request-auth') + this.onRequestAuth.emit() return } this._session.authorizationPending = true - this.emit('auth-begin') + this.onAuthBegin.emit() doAuthorization(this, this._crypto) .then(([authKey, serverSalt, timeOffset]) => { @@ -301,7 +321,7 @@ export class SessionConnection extends PersistentConnection { this._session.authorizationPending = false - this.emit('key-change', authKey) + this.onKeyChange.emit(authKey) if (this._usePfs) { return this._authorizePfs() @@ -312,7 +332,7 @@ export class SessionConnection extends PersistentConnection { this._session.authorizationPending = false if (this._destroyed) return this.log.error('Authorization error: %e', err) - this.onError(err) + this.handleError(err) this.reconnect() }) } @@ -464,7 +484,7 @@ export class SessionConnection extends PersistentConnection { // we must re-init connection after binding temp key this._session.initConnectionCalled = false - this.emit('tmp-key-change', tempAuthKey, inner.expiresAt) + this.onTmpKeyChange.emit([tempAuthKey, inner.expiresAt]) this.onConnectionUsable() // set a timeout to update temp auth key in advance to avoid interruption @@ -489,7 +509,7 @@ export class SessionConnection extends PersistentConnection { } this._isPfsBindingPending = false - this.onError(err) + this.handleError(err) this.reconnect() }) } @@ -661,7 +681,7 @@ export class SessionConnection extends PersistentConnection { break } - this.emit('update', message) + this.onUpdate.emit(message) return } @@ -1145,7 +1165,7 @@ export class SessionConnection extends PersistentConnection { // force the client to fetch missed updates // when _lastSessionCreatedUid == 0, the connection has // just been established, and the client will fetch them anyways - this.emit('update', { _: 'updatesTooLong' }) + this.onUpdate.emit({ _: 'updatesTooLong' }) } this._salts.currentSalt = serverSalt @@ -1287,7 +1307,7 @@ export class SessionConnection extends PersistentConnection { this._salts.isFetching = false this._salts.setFutureSalts(msg.salts.slice()) - this.emit('future-salts', msg.salts) + this.onFutureSalts.emit(msg.salts) } private _onDestroySessionResult(msg: mtp.TypeDestroySessionRes): void { diff --git a/packages/dispatcher/src/dispatcher.ts b/packages/dispatcher/src/dispatcher.ts index 5bc51720..a09c3a0f 100644 --- a/packages/dispatcher/src/dispatcher.ts +++ b/packages/dispatcher/src/dispatcher.ts @@ -19,6 +19,7 @@ import type { PeersIndex, PollUpdate, PollVoteUpdate, + RawUpdateInfo, StoryUpdate, UserStatusUpdate, UserTypingUpdate, @@ -270,8 +271,8 @@ export class Dispatcher { * Dispatcher also uses bound client to throw errors */ bindToClient(client: TelegramClient): void { - client.on('update', this.dispatchUpdate) - client.on('raw_update', this.dispatchRawUpdate) + client.onUpdate.add(this.dispatchUpdate) + client.onRawUpdate.add(this.dispatchRawUpdate) this._client = client } @@ -281,8 +282,8 @@ export class Dispatcher { */ unbind(): void { if (this._client) { - this._client.off('update', this.dispatchUpdate) - this._client.off('raw_update', this.dispatchRawUpdate) + this._client.onUpdate.remove(this.dispatchUpdate) + this._client.onRawUpdate.remove(this.dispatchRawUpdate) this._client = undefined } @@ -322,7 +323,7 @@ export class Dispatcher { * @param update Update to process * @param peers Peers index */ - dispatchRawUpdate(update: tl.TypeUpdate | tl.TypeMessage, peers: PeersIndex): void { + dispatchRawUpdate({ update, peers }: RawUpdateInfo): void { if (!this._client) return // order does not matter in the dispatcher, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 43db4ae6..464f08df 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -17,9 +17,6 @@ importers: '@fuman/jsr': specifier: workspace:^ version: link:private/fuman/packages/jsr - '@fuman/utils': - specifier: workspace:^ - version: link:private/fuman/packages/utils '@types/deno': specifier: npm:@teidesu/deno-types@1.46.3 version: '@teidesu/deno-types@1.46.3'