chore!: migrate to fuman Emitter

breaking: all events now use Emitter interface (`.on(event, ...) => `.onEvent.add(...)`)
This commit is contained in:
alina 🌸 2024-10-01 04:03:18 +03:00
parent 26d068eeb2
commit 0cff4113f3
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
17 changed files with 389 additions and 477 deletions

View file

@ -464,8 +464,7 @@ async function main() {
output.write( output.write(
'/* eslint-disable ts/no-unsafe-declaration-merging, ts/no-unsafe-argument */\n' '/* eslint-disable ts/no-unsafe-declaration-merging, ts/no-unsafe-argument */\n'
+ '/* THIS FILE WAS AUTO-GENERATED */\n' + '/* THIS FILE WAS AUTO-GENERATED */\n'
+ '// eslint-disable-next-line unicorn/prefer-node-protocol\n' + "import { Emitter } from '@fuman/utils'\n"
+ "import EventEmitter from 'events'\n"
+ "import Long from 'long'\n", + "import Long from 'long'\n",
) )
Object.entries(state.imports).forEach(([module, items]) => { Object.entries(state.imports).forEach(([module, items]) => {
@ -483,36 +482,17 @@ async function main() {
output.write('\nexport interface TelegramClient extends ITelegramClient {\n') output.write('\nexport interface TelegramClient extends ITelegramClient {\n')
output.write(`/** output.write(`/** Raw update emitter */
* Register a raw update handler readonly onRawUpdate: Emitter<RawUpdateInfo>
* /** Parsed update emitter */
* @param name Event name readonly onUpdate: Emitter<ParsedUpdate>`)
* @param handler Raw update handler
*/
on(name: 'raw_update', handler: ((upd: tl.TypeUpdate | tl.TypeMessage, peers: PeersIndex) => void)): this
/**
* Register a parsed update handler
*
* @param name Event name
* @param handler Raw update handler
*/
on(name: 'update', handler: ((upd: ParsedUpdate) => void)): this\n`)
updates.types.forEach((type) => { updates.types.forEach((type) => {
output.write(`/** output.write(`/** ${updates.toSentence(type, 'inline')} */\n`)
* Register ${updates.toSentence(type, 'inline')} output.write(`readonly on${type.handlerTypeName}: Emitter<${type.updateType}>\n`)
*
* @param name Event name
* @param handler ${updates.toSentence(type, 'full')}
*/
on(name: '${type.typeName}', handler: ((upd: ${type.updateType}) => void)): this\n`)
}) })
output.write(` output.write(`/**
// eslint-disable-next-line ts/no-explicit-any
on(name: string, handler: (...args: any[]) => void): this\n
/**
* Wrap this client so that all RPC calls will use the specified parameters. * Wrap this client so that all RPC calls will use the specified parameters.
* *
* @param params Parameters to use * @param params Parameters to use
@ -691,14 +671,31 @@ withParams(params: RpcCallOptions): this\n`)
output.write('\nexport type { TelegramClientOptions }\n') output.write('\nexport type { TelegramClientOptions }\n')
output.write('\nexport * from "./base.js"\n') output.write('\nexport * from "./base.js"\n')
output.write('\nexport class TelegramClient extends EventEmitter implements ITelegramClient {\n') output.write('\nexport class TelegramClient implements ITelegramClient {\n')
output.write(' _client: ITelegramClient\n') output.write(' _client: ITelegramClient\n')
state.fields.forEach(({ code }) => output.write(`protected ${code}\n`)) state.fields.forEach(({ code }) => output.write(`protected ${code}\n`))
output.write('constructor(opts: TelegramClientOptions) {\n') output.write('constructor(opts: TelegramClientOptions) {\n')
output.write(' super()\n') output.write(' ;(this as any).onRawUpdate = new Emitter()\n')
output.write(' ;(this as any).onUpdate = new Emitter()\n')
updates.types.forEach((type) => {
// we use declaration merging so we can't simply write into this because it thinks it's readonly and already has a value
output.write(` ;(this as any).on${type.handlerTypeName} = new Emitter()\n`)
})
state.init.forEach((code) => { state.init.forEach((code) => {
code = code.replace('// @generate-update-emitter', () => {
const lines = [
' switch (update.name) {',
]
updates.types.forEach((type) => {
lines.push(` case '${type.typeName}':`)
lines.push(` this.on${type.handlerTypeName}.emit(update.data)`)
lines.push(' break')
})
lines.push(' }')
return lines.join('\n')
})
output.write(`${code}\n`) output.write(`${code}\n`)
}) })
output.write('}\n') output.write('}\n')
@ -734,8 +731,6 @@ withParams(params: RpcCallOptions): this\n`)
'getPrimaryDcId', 'getPrimaryDcId',
'computeSrpParams', 'computeSrpParams',
'computeNewPasswordHash', 'computeNewPasswordHash',
'onConnectionState',
'getServerUpdateHandler',
'changePrimaryDc', 'changePrimaryDc',
'getMtprotoMessageId', 'getMtprotoMessageId',
].forEach((name) => { ].forEach((name) => {
@ -745,15 +740,6 @@ withParams(params: RpcCallOptions): this\n`)
+ '}\n', + '}\n',
) )
}) })
// disabled methods - they are used internally and we don't want to expose them
// if the user *really* needs them, they can use `client._client` to access the underlying client
;['onServerUpdate', 'onUpdate'].forEach((name) => {
output.write(
`TelegramClient.prototype.${name} = function() {\n`
+ ` throw new Error('${name} is not available for TelegramClient, use .on() methods instead')\n`
+ '}\n',
)
})
state.impls.forEach(({ name, code }) => output.write(`TelegramClient.prototype.${name} = ${code}\n`)) state.impls.forEach(({ name, code }) => output.write(`TelegramClient.prototype.${name} = ${code}\n`))
// write methods re-exports to separate file // write methods re-exports to separate file

View file

@ -1,12 +1,12 @@
import type { mtp } from '@mtcute/tl' import type { mtp } from '@mtcute/tl'
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
import type Long from 'long' import type Long from 'long'
import { Emitter } from '@fuman/utils'
import type { MtClientOptions } from '../network/client.js' import type { MtClientOptions } from '../network/client.js'
import { MtClient } from '../network/client.js' import { MtClient } from '../network/client.js'
import type { ConnectionKind, RpcCallOptions } from '../network/network-manager.js' import type { ConnectionKind, RpcCallOptions } from '../network/network-manager.js'
import type { StorageManagerExtraOptions } from '../storage/storage.js' import type { StorageManagerExtraOptions } from '../storage/storage.js'
import { MtArgumentError } from '../types/errors.js'
import type { MustEqual } from '../types/utils.js' import type { MustEqual } from '../types/utils.js'
import { reportUnknownError } from '../utils/error-reporting.js' import { reportUnknownError } from '../utils/error-reporting.js'
import type { import type {
@ -25,13 +25,13 @@ import {
import { LogManager } from '../utils/logger.js' import { LogManager } from '../utils/logger.js'
import type { ICorePlatform } from '../types/platform' import type { ICorePlatform } from '../types/platform'
import type { ConnectionState, ITelegramClient, ServerUpdateHandler } from './client.types.js' import type { ConnectionState, ITelegramClient } from './client.types.js'
import { AppConfigManager } from './managers/app-config-manager.js' import { AppConfigManager } from './managers/app-config-manager.js'
import type { ITelegramStorageProvider } from './storage/provider.js' import type { ITelegramStorageProvider } from './storage/provider.js'
import type { TelegramStorageManagerExtraOptions } from './storage/storage.js' import type { TelegramStorageManagerExtraOptions } from './storage/storage.js'
import { TelegramStorageManager } from './storage/storage.js' import { TelegramStorageManager } from './storage/storage.js'
import { UpdatesManager } from './updates/manager.js' import { UpdatesManager } from './updates/manager.js'
import type { RawUpdateHandler, UpdatesManagerParams } from './updates/types.js' import type { RawUpdateInfo, UpdatesManagerParams } from './updates/types.js'
export interface BaseTelegramClientOptions extends MtClientOptions { export interface BaseTelegramClientOptions extends MtClientOptions {
storage: ITelegramStorageProvider storage: ITelegramStorageProvider
@ -51,8 +51,6 @@ function makeRpcError(raw: mtp.RawMt_rpc_error, stack: string, method?: string)
export class BaseTelegramClient implements ITelegramClient { export class BaseTelegramClient implements ITelegramClient {
readonly updates?: UpdatesManager readonly updates?: UpdatesManager
private _serverUpdatesHandler: ServerUpdateHandler = () => {}
private _connectionStateHandler: (state: ConnectionState) => void = () => {}
readonly log: Logger readonly log: Logger
readonly mt: MtClient readonly mt: MtClient
@ -60,6 +58,10 @@ export class BaseTelegramClient implements ITelegramClient {
readonly storage: TelegramStorageManager readonly storage: TelegramStorageManager
readonly platform: ICorePlatform readonly platform: ICorePlatform
readonly onServerUpdate: Emitter<tl.TypeUpdates> = new Emitter()
readonly onRawUpdate: Emitter<RawUpdateInfo> = new Emitter()
readonly onConnectionState: Emitter<ConnectionState> = new Emitter()
constructor(readonly params: BaseTelegramClientOptions) { constructor(readonly params: BaseTelegramClientOptions) {
this.log = this.params.logger ?? new LogManager('client', params.platform) this.log = this.params.logger ?? new LogManager('client', params.platform)
this.platform = this.params.platform this.platform = this.params.platform
@ -70,24 +72,18 @@ export class BaseTelegramClient implements ITelegramClient {
if (!params.disableUpdates && params.updates !== false) { if (!params.disableUpdates && params.updates !== false) {
this.updates = new UpdatesManager(this, params.updates) this.updates = new UpdatesManager(this, params.updates)
this._serverUpdatesHandler = this.updates.handleUpdate.bind(this.updates) this.onServerUpdate.add(this.updates.handleUpdate.bind(this.updates))
this.updates.onCatchingUp((catchingUp) => { this.updates.onCatchingUp((catchingUp) => {
this._connectionStateHandler(catchingUp ? 'updating' : 'connected') this.onConnectionState.emit(catchingUp ? 'updating' : 'connected')
}) })
} }
this.mt.on('update', (update: tl.TypeUpdates) => { this.mt.onUpdate.forwardTo(this.onServerUpdate)
this._serverUpdatesHandler(update) this.mt.onUsable.add(() => this.onConnectionState.emit('connected'))
}) this.mt.onConnecting.add(() => this.onConnectionState.emit('connecting'))
this.mt.on('usable', () => { this.mt.onNetworkChanged.add((connected: boolean) => {
this._connectionStateHandler('connected')
})
this.mt.on('wait', () => {
this._connectionStateHandler('connecting')
})
this.mt.on('networkChanged', (connected: boolean) => {
if (!connected) { if (!connected) {
this._connectionStateHandler('offline') this.onConnectionState.emit('offline')
} }
}) })
@ -311,26 +307,6 @@ export class BaseTelegramClient implements ITelegramClient {
this.updates?.handleClientUpdate(updates, noDispatch) this.updates?.handleClientUpdate(updates, noDispatch)
} }
onServerUpdate(handler: ServerUpdateHandler): void {
this._serverUpdatesHandler = handler
}
getServerUpdateHandler(): ServerUpdateHandler {
return this._serverUpdatesHandler
}
onUpdate(handler: RawUpdateHandler): void {
if (!this.updates) {
throw new MtArgumentError('Updates manager is disabled')
}
this.updates.setHandler(handler)
}
onConnectionState(handler: (state: ConnectionState) => void): void {
this._connectionStateHandler = handler
}
async getApiCrenetials(): Promise<{ async getApiCrenetials(): Promise<{
id: number id: number
hash: string hash: string

View file

@ -1,8 +1,6 @@
/* eslint-disable ts/no-unsafe-declaration-merging, ts/no-unsafe-argument */ /* eslint-disable ts/no-unsafe-declaration-merging, ts/no-unsafe-argument */
/* THIS FILE WAS AUTO-GENERATED */ /* THIS FILE WAS AUTO-GENERATED */
// eslint-disable-next-line unicorn/prefer-node-protocol import { Emitter } from '@fuman/utils'
import EventEmitter from 'events'
import type Long from 'long' import type Long from 'long'
import type { tdFileId } from '@mtcute/file-id' import type { tdFileId } from '@mtcute/file-id'
import type { tl } from '@mtcute/tl' import type { tl } from '@mtcute/tl'
@ -14,7 +12,8 @@ import { MtUnsupportedError } from '../types/index.js'
import type { BaseTelegramClientOptions } from './base.js' import type { BaseTelegramClientOptions } from './base.js'
import { BaseTelegramClient } from './base.js' import { BaseTelegramClient } from './base.js'
import type { ITelegramClient } from './client.types.js' import type { ITelegramClient } from './client.types.js'
import type { AllStories, ArrayPaginated, ArrayWithTotal, Boost, BoostSlot, BoostStats, BotChatJoinRequestUpdate, BotCommands, BotReactionCountUpdate, BotReactionUpdate, BotStoppedUpdate, BusinessCallbackQuery, BusinessChatLink, BusinessConnection, BusinessMessage, BusinessWorkHoursDay, CallbackQuery, Chat, ChatEvent, ChatInviteLink, ChatInviteLinkMember, ChatJoinRequestUpdate, ChatMember, ChatMemberUpdate, ChatPreview, ChatlistPreview, ChosenInlineResult, CollectibleInfo, DeleteBusinessMessageUpdate, DeleteMessageUpdate, DeleteStoryUpdate, Dialog, FactCheck, FileDownloadLocation, FileDownloadParameters, ForumTopic, FullChat, GameHighScore, HistoryReadUpdate, InlineCallbackQuery, InlineQuery, InputChatEventFilters, InputDialogFolder, InputFileLike, InputInlineResult, InputMediaLike, InputMediaSticker, InputMessageId, InputPeerLike, InputPrivacyRule, InputReaction, InputStickerSet, InputStickerSetItem, InputText, MaybeDynamic, Message, MessageEffect, MessageMedia, MessageReactions, ParametersSkip2, ParsedUpdate, PeerReaction, PeerStories, PeersIndex, Photo, Poll, PollUpdate, PollVoteUpdate, PreCheckoutQuery, RawDocument, ReplyMarkup, SentCode, StarsStatus, StarsTransaction, Sticker, StickerSet, StickerSourceType, StickerType, StoriesStealthMode, Story, StoryInteractions, StoryUpdate, StoryViewer, StoryViewersList, TakeoutSession, TextWithEntities, TypingStatus, UploadFileLike, UploadedFile, User, UserStatusUpdate, UserTypingUpdate } from './types/index.js' import type { RawUpdateInfo } from './updates/types.js'
import type { AllStories, ArrayPaginated, ArrayWithTotal, Boost, BoostSlot, BoostStats, BotChatJoinRequestUpdate, BotCommands, BotReactionCountUpdate, BotReactionUpdate, BotStoppedUpdate, BusinessCallbackQuery, BusinessChatLink, BusinessConnection, BusinessMessage, BusinessWorkHoursDay, CallbackQuery, Chat, ChatEvent, ChatInviteLink, ChatInviteLinkMember, ChatJoinRequestUpdate, ChatMember, ChatMemberUpdate, ChatPreview, ChatlistPreview, ChosenInlineResult, CollectibleInfo, DeleteBusinessMessageUpdate, DeleteMessageUpdate, DeleteStoryUpdate, Dialog, FactCheck, FileDownloadLocation, FileDownloadParameters, ForumTopic, FullChat, GameHighScore, HistoryReadUpdate, InlineCallbackQuery, InlineQuery, InputChatEventFilters, InputDialogFolder, InputFileLike, InputInlineResult, InputMediaLike, InputMediaSticker, InputMessageId, InputPeerLike, InputPrivacyRule, InputReaction, InputStickerSet, InputStickerSetItem, InputText, MaybeDynamic, Message, MessageEffect, MessageMedia, MessageReactions, ParametersSkip2, ParsedUpdate, PeerReaction, PeerStories, Photo, Poll, PollUpdate, PollVoteUpdate, PreCheckoutQuery, RawDocument, ReplyMarkup, SentCode, StarsStatus, StarsTransaction, Sticker, StickerSet, StickerSourceType, StickerType, StoriesStealthMode, Story, StoryInteractions, StoryUpdate, StoryViewer, StoryViewersList, TakeoutSession, TextWithEntities, TypingStatus, UploadFileLike, UploadedFile, User, UserStatusUpdate, UserTypingUpdate } from './types/index.js'
import type { StringSessionData } from './utils/string-session.js' import type { StringSessionData } from './utils/string-session.js'
import type { ITelegramStorageProvider } from './storage/provider.js' import type { ITelegramStorageProvider } from './storage/provider.js'
import { Conversation } from './types/conversation.js' import { Conversation } from './types/conversation.js'
@ -313,220 +312,65 @@ type TelegramClientOptions = (
} }
export interface TelegramClient extends ITelegramClient { export interface TelegramClient extends ITelegramClient {
/** /** Raw update emitter */
* Register a raw update handler readonly onRawUpdate: Emitter<RawUpdateInfo>
* /** Parsed update emitter */
* @param name Event name readonly onUpdate: Emitter<ParsedUpdate>/** a new message handler */
* @param handler Raw update handler readonly onNewMessage: Emitter<Message>
*/ /** an edit message handler */
on(name: 'raw_update', handler: ((upd: tl.TypeUpdate | tl.TypeMessage, peers: PeersIndex) => void)): this readonly onEditMessage: Emitter<Message>
/** /** a message group handler */
* Register a parsed update handler readonly onMessageGroup: Emitter<Message[]>
* /** a delete message handler */
* @param name Event name readonly onDeleteMessage: Emitter<DeleteMessageUpdate>
* @param handler Raw update handler /** a chat member update handler */
*/ readonly onChatMemberUpdate: Emitter<ChatMemberUpdate>
on(name: 'update', handler: ((upd: ParsedUpdate) => void)): this /** an inline query handler */
/** readonly onInlineQuery: Emitter<InlineQuery>
* Register a new message handler /** a chosen inline result handler */
* readonly onChosenInlineResult: Emitter<ChosenInlineResult>
* @param name Event name /** a callback query handler */
* @param handler New message handler readonly onCallbackQuery: Emitter<CallbackQuery>
*/ /** an inline callback query handler */
on(name: 'new_message', handler: ((upd: Message) => void)): this readonly onInlineCallbackQuery: Emitter<InlineCallbackQuery>
/** /** a business callback query handler */
* Register an edit message handler readonly onBusinessCallbackQuery: Emitter<BusinessCallbackQuery>
* /** a poll update handler */
* @param name Event name readonly onPollUpdate: Emitter<PollUpdate>
* @param handler Edit message handler /** a poll vote handler */
*/ readonly onPollVote: Emitter<PollVoteUpdate>
on(name: 'edit_message', handler: ((upd: Message) => void)): this /** an user status update handler */
/** readonly onUserStatusUpdate: Emitter<UserStatusUpdate>
* Register a message group handler /** an user typing handler */
* readonly onUserTyping: Emitter<UserTypingUpdate>
* @param name Event name /** a history read handler */
* @param handler Message group handler readonly onHistoryRead: Emitter<HistoryReadUpdate>
*/ /** a bot stopped handler */
on(name: 'message_group', handler: ((upd: Message[]) => void)): this readonly onBotStopped: Emitter<BotStoppedUpdate>
/** /** a bot chat join request handler */
* Register a delete message handler readonly onBotChatJoinRequest: Emitter<BotChatJoinRequestUpdate>
* /** a chat join request handler */
* @param name Event name readonly onChatJoinRequest: Emitter<ChatJoinRequestUpdate>
* @param handler Delete message handler /** a pre checkout query handler */
*/ readonly onPreCheckoutQuery: Emitter<PreCheckoutQuery>
on(name: 'delete_message', handler: ((upd: DeleteMessageUpdate) => void)): this /** a story update handler */
/** readonly onStoryUpdate: Emitter<StoryUpdate>
* Register a chat member update handler /** a delete story handler */
* readonly onDeleteStory: Emitter<DeleteStoryUpdate>
* @param name Event name /** a bot reaction update handler */
* @param handler Chat member update handler readonly onBotReactionUpdate: Emitter<BotReactionUpdate>
*/ /** a bot reaction count update handler */
on(name: 'chat_member', handler: ((upd: ChatMemberUpdate) => void)): this readonly onBotReactionCountUpdate: Emitter<BotReactionCountUpdate>
/** /** a business connection update handler */
* Register an inline query handler readonly onBusinessConnectionUpdate: Emitter<BusinessConnection>
* /** a new business message handler */
* @param name Event name readonly onNewBusinessMessage: Emitter<BusinessMessage>
* @param handler Inline query handler /** an edit business message handler */
*/ readonly onEditBusinessMessage: Emitter<BusinessMessage>
on(name: 'inline_query', handler: ((upd: InlineQuery) => void)): this /** a business message group handler */
/** readonly onBusinessMessageGroup: Emitter<BusinessMessage[]>
* Register a chosen inline result handler /** a delete business message handler */
* readonly onDeleteBusinessMessage: Emitter<DeleteBusinessMessageUpdate>
* @param name Event name
* @param handler Chosen inline result handler
*/
on(name: 'chosen_inline_result', handler: ((upd: ChosenInlineResult) => void)): this
/**
* Register a callback query handler
*
* @param name Event name
* @param handler Callback query handler
*/
on(name: 'callback_query', handler: ((upd: CallbackQuery) => void)): this
/**
* Register an inline callback query handler
*
* @param name Event name
* @param handler Inline callback query handler
*/
on(name: 'inline_callback_query', handler: ((upd: InlineCallbackQuery) => void)): this
/**
* Register a business callback query handler
*
* @param name Event name
* @param handler Business callback query handler
*/
on(name: 'business_callback_query', handler: ((upd: BusinessCallbackQuery) => void)): this
/**
* Register a poll update handler
*
* @param name Event name
* @param handler Poll update handler
*/
on(name: 'poll', handler: ((upd: PollUpdate) => void)): this
/**
* Register a poll vote handler
*
* @param name Event name
* @param handler Poll vote handler
*/
on(name: 'poll_vote', handler: ((upd: PollVoteUpdate) => void)): this
/**
* Register an user status update handler
*
* @param name Event name
* @param handler User status update handler
*/
on(name: 'user_status', handler: ((upd: UserStatusUpdate) => void)): this
/**
* Register an user typing handler
*
* @param name Event name
* @param handler User typing handler
*/
on(name: 'user_typing', handler: ((upd: UserTypingUpdate) => void)): this
/**
* Register a history read handler
*
* @param name Event name
* @param handler History read handler
*/
on(name: 'history_read', handler: ((upd: HistoryReadUpdate) => void)): this
/**
* Register a bot stopped handler
*
* @param name Event name
* @param handler Bot stopped handler
*/
on(name: 'bot_stopped', handler: ((upd: BotStoppedUpdate) => void)): this
/**
* Register a bot chat join request handler
*
* @param name Event name
* @param handler Bot chat join request handler
*/
on(name: 'bot_chat_join_request', handler: ((upd: BotChatJoinRequestUpdate) => void)): this
/**
* Register a chat join request handler
*
* @param name Event name
* @param handler Chat join request handler
*/
on(name: 'chat_join_request', handler: ((upd: ChatJoinRequestUpdate) => void)): this
/**
* Register a pre checkout query handler
*
* @param name Event name
* @param handler Pre checkout query handler
*/
on(name: 'pre_checkout_query', handler: ((upd: PreCheckoutQuery) => void)): this
/**
* Register a story update handler
*
* @param name Event name
* @param handler Story update handler
*/
on(name: 'story', handler: ((upd: StoryUpdate) => void)): this
/**
* Register a delete story handler
*
* @param name Event name
* @param handler Delete story handler
*/
on(name: 'delete_story', handler: ((upd: DeleteStoryUpdate) => void)): this
/**
* Register a bot reaction update handler
*
* @param name Event name
* @param handler Bot reaction update handler
*/
on(name: 'bot_reaction', handler: ((upd: BotReactionUpdate) => void)): this
/**
* Register a bot reaction count update handler
*
* @param name Event name
* @param handler Bot reaction count update handler
*/
on(name: 'bot_reaction_count', handler: ((upd: BotReactionCountUpdate) => void)): this
/**
* Register a business connection update handler
*
* @param name Event name
* @param handler Business connection update handler
*/
on(name: 'business_connection', handler: ((upd: BusinessConnection) => void)): this
/**
* Register a new business message handler
*
* @param name Event name
* @param handler New business message handler
*/
on(name: 'new_business_message', handler: ((upd: BusinessMessage) => void)): this
/**
* Register an edit business message handler
*
* @param name Event name
* @param handler Edit business message handler
*/
on(name: 'edit_business_message', handler: ((upd: BusinessMessage) => void)): this
/**
* Register a business message group handler
*
* @param name Event name
* @param handler Business message group handler
*/
on(name: 'business_message_group', handler: ((upd: BusinessMessage[]) => void)): this
/**
* Register a delete business message handler
*
* @param name Event name
* @param handler Delete business message handler
*/
on(name: 'delete_business_message', handler: ((upd: DeleteBusinessMessageUpdate) => void)): this
// eslint-disable-next-line ts/no-explicit-any
on(name: string, handler: (...args: any[]) => void): this
/** /**
* Wrap this client so that all RPC calls will use the specified parameters. * Wrap this client so that all RPC calls will use the specified parameters.
* *
@ -5750,48 +5594,156 @@ export type { TelegramClientOptions }
export * from './base.js' export * from './base.js'
export class TelegramClient extends EventEmitter implements ITelegramClient { export class TelegramClient implements ITelegramClient {
_client: ITelegramClient _client: ITelegramClient
constructor(opts: TelegramClientOptions) { constructor(opts: TelegramClientOptions) {
super() ;(this as any).onRawUpdate = new Emitter()
;(this as any).onUpdate = new Emitter()
;(this as any).onNewMessage = new Emitter()
;(this as any).onEditMessage = new Emitter()
;(this as any).onMessageGroup = new Emitter()
;(this as any).onDeleteMessage = new Emitter()
;(this as any).onChatMemberUpdate = new Emitter()
;(this as any).onInlineQuery = new Emitter()
;(this as any).onChosenInlineResult = new Emitter()
;(this as any).onCallbackQuery = new Emitter()
;(this as any).onInlineCallbackQuery = new Emitter()
;(this as any).onBusinessCallbackQuery = new Emitter()
;(this as any).onPollUpdate = new Emitter()
;(this as any).onPollVote = new Emitter()
;(this as any).onUserStatusUpdate = new Emitter()
;(this as any).onUserTyping = new Emitter()
;(this as any).onHistoryRead = new Emitter()
;(this as any).onBotStopped = new Emitter()
;(this as any).onBotChatJoinRequest = new Emitter()
;(this as any).onChatJoinRequest = new Emitter()
;(this as any).onPreCheckoutQuery = new Emitter()
;(this as any).onStoryUpdate = new Emitter()
;(this as any).onDeleteStory = new Emitter()
;(this as any).onBotReactionUpdate = new Emitter()
;(this as any).onBotReactionCountUpdate = new Emitter()
;(this as any).onBusinessConnectionUpdate = new Emitter()
;(this as any).onNewBusinessMessage = new Emitter()
;(this as any).onEditBusinessMessage = new Emitter()
;(this as any).onBusinessMessageGroup = new Emitter()
;(this as any).onDeleteBusinessMessage = new Emitter()
if ('client' in opts) { if ('client' in opts) {
this._client = opts.client this._client = opts.client
} else { } else {
if (!opts.storage || typeof opts.storage === 'string' || !opts.transport || !opts.crypto) { if (!opts.storage || typeof opts.storage === 'string' || !opts.transport || !opts.crypto || !opts.platform) {
throw new MtUnsupportedError( throw new MtUnsupportedError(
'You need to explicitly provide storage, transport and crypto for @mtcute/core', 'You need to explicitly provide storage, transport, crypto and platform for @mtcute/core',
) )
} }
this._client = new BaseTelegramClient(opts as BaseTelegramClientOptions) this._client = new BaseTelegramClient(opts as BaseTelegramClientOptions)
} }
// @ts-expect-error codegen Object.defineProperty(this, 'log', { value: this._client.log })
this.log = this._client.log Object.defineProperty(this, 'storage', { value: this._client.storage })
// @ts-expect-error codegen Object.defineProperty(this, 'stopSignal', { value: this._client.stopSignal })
this.storage = this._client.storage Object.defineProperty(this, 'appConfig', { value: this._client.appConfig })
Object.defineProperty(this, 'stopSignal', { Object.defineProperty(this, 'onServerUpdate', { value: this._client.onServerUpdate })
get: () => this._client.stopSignal, Object.defineProperty(this, 'onRawUpdate', { value: this._client.onServerUpdate })
}) Object.defineProperty(this, 'onConnectionState', { value: this._client.onConnectionState })
Object.defineProperty(this, 'appConfig', {
get: () => this._client.appConfig,
})
if (!opts.disableUpdates) { if (!opts.disableUpdates) {
const skipConversationUpdates = opts.skipConversationUpdates ?? true const skipConversationUpdates = opts.skipConversationUpdates ?? true
const { messageGroupingInterval } = opts.updates ?? {} const { messageGroupingInterval } = opts.updates ?? {}
this._client.onUpdate( this._client.onRawUpdate.add(
makeParsedUpdateHandler({ makeParsedUpdateHandler({
messageGroupingInterval, messageGroupingInterval,
onUpdate: (update) => { onUpdate: (update) => {
if (Conversation.handleUpdate(this, update) && skipConversationUpdates) return if (Conversation.handleUpdate(this, update) && skipConversationUpdates) return
this.emit('update', update) this.onUpdate.emit(update)
this.emit(update.name, update.data) switch (update.name) {
}, case 'new_message':
onRawUpdate: (update, peers) => { this.onNewMessage.emit(update.data)
this.emit('raw_update', update, peers) break
case 'edit_message':
this.onEditMessage.emit(update.data)
break
case 'message_group':
this.onMessageGroup.emit(update.data)
break
case 'delete_message':
this.onDeleteMessage.emit(update.data)
break
case 'chat_member':
this.onChatMemberUpdate.emit(update.data)
break
case 'inline_query':
this.onInlineQuery.emit(update.data)
break
case 'chosen_inline_result':
this.onChosenInlineResult.emit(update.data)
break
case 'callback_query':
this.onCallbackQuery.emit(update.data)
break
case 'inline_callback_query':
this.onInlineCallbackQuery.emit(update.data)
break
case 'business_callback_query':
this.onBusinessCallbackQuery.emit(update.data)
break
case 'poll':
this.onPollUpdate.emit(update.data)
break
case 'poll_vote':
this.onPollVote.emit(update.data)
break
case 'user_status':
this.onUserStatusUpdate.emit(update.data)
break
case 'user_typing':
this.onUserTyping.emit(update.data)
break
case 'history_read':
this.onHistoryRead.emit(update.data)
break
case 'bot_stopped':
this.onBotStopped.emit(update.data)
break
case 'bot_chat_join_request':
this.onBotChatJoinRequest.emit(update.data)
break
case 'chat_join_request':
this.onChatJoinRequest.emit(update.data)
break
case 'pre_checkout_query':
this.onPreCheckoutQuery.emit(update.data)
break
case 'story':
this.onStoryUpdate.emit(update.data)
break
case 'delete_story':
this.onDeleteStory.emit(update.data)
break
case 'bot_reaction':
this.onBotReactionUpdate.emit(update.data)
break
case 'bot_reaction_count':
this.onBotReactionCountUpdate.emit(update.data)
break
case 'business_connection':
this.onBusinessConnectionUpdate.emit(update.data)
break
case 'new_business_message':
this.onNewBusinessMessage.emit(update.data)
break
case 'edit_business_message':
this.onEditBusinessMessage.emit(update.data)
break
case 'business_message_group':
this.onBusinessMessageGroup.emit(update.data)
break
case 'delete_business_message':
this.onDeleteBusinessMessage.emit(update.data)
break
}
}, },
}), }),
) )
@ -6656,21 +6608,9 @@ TelegramClient.prototype.computeSrpParams = function (...args) {
TelegramClient.prototype.computeNewPasswordHash = function (...args) { TelegramClient.prototype.computeNewPasswordHash = function (...args) {
return this._client.computeNewPasswordHash(...args) return this._client.computeNewPasswordHash(...args)
} }
TelegramClient.prototype.onConnectionState = function (...args) {
return this._client.onConnectionState(...args)
}
TelegramClient.prototype.getServerUpdateHandler = function (...args) {
return this._client.getServerUpdateHandler(...args)
}
TelegramClient.prototype.changePrimaryDc = function (...args) { TelegramClient.prototype.changePrimaryDc = function (...args) {
return this._client.changePrimaryDc(...args) return this._client.changePrimaryDc(...args)
} }
TelegramClient.prototype.getMtprotoMessageId = function (...args) { TelegramClient.prototype.getMtprotoMessageId = function (...args) {
return this._client.getMtprotoMessageId(...args) return this._client.getMtprotoMessageId(...args)
} }
TelegramClient.prototype.onServerUpdate = function () {
throw new Error('onServerUpdate is not available for TelegramClient, use .on() methods instead')
}
TelegramClient.prototype.onUpdate = function () {
throw new Error('onUpdate is not available for TelegramClient, use .on() methods instead')
}

View file

@ -1,5 +1,6 @@
import type { tl } from '@mtcute/tl' import type { tl } from '@mtcute/tl'
import type Long from 'long' import type Long from 'long'
import type { Emitter } from '@fuman/utils'
import type { ConnectionKind, RpcCallOptions } from '../network/index.js' import type { ConnectionKind, RpcCallOptions } from '../network/index.js'
import type { MustEqual, PublicPart } from '../types/utils.js' import type { MustEqual, PublicPart } from '../types/utils.js'
@ -8,8 +9,8 @@ import type { ICorePlatform } from '../types/platform'
import type { AppConfigManager } from './managers/app-config-manager.js' import type { AppConfigManager } from './managers/app-config-manager.js'
import type { TelegramStorageManager } from './storage/storage.js' import type { TelegramStorageManager } from './storage/storage.js'
import type { RawUpdateHandler } from './updates/types.js'
import type { StringSessionData } from './utils/string-session.js' import type { StringSessionData } from './utils/string-session.js'
import type { RawUpdateInfo } from './updates/types.js'
/** /**
* Connection state of the client * Connection state of the client
@ -25,8 +26,6 @@ import type { StringSessionData } from './utils/string-session.js'
*/ */
export type ConnectionState = 'offline' | 'connecting' | 'updating' | 'connected' export type ConnectionState = 'offline' | 'connecting' | 'updating' | 'connected'
export type ServerUpdateHandler = (update: tl.TypeUpdates) => void
// NB: when adding new methods, don't forget to add them to: // NB: when adding new methods, don't forget to add them to:
// - worker/port.ts // - worker/port.ts
// - generate-client script // - generate-client script
@ -57,10 +56,9 @@ export interface ITelegramClient {
emitError(err: unknown): void emitError(err: unknown): void
handleClientUpdate(updates: tl.TypeUpdates, noDispatch?: boolean): void handleClientUpdate(updates: tl.TypeUpdates, noDispatch?: boolean): void
onServerUpdate(handler: ServerUpdateHandler): void onServerUpdate: Emitter<tl.TypeUpdates>
getServerUpdateHandler(): ServerUpdateHandler onRawUpdate: Emitter<RawUpdateInfo>
onUpdate(handler: RawUpdateHandler): void onConnectionState: Emitter<ConnectionState>
onConnectionState(handler: (state: ConnectionState) => void): void
getApiCrenetials(): Promise<{ id: number, hash: string }> getApiCrenetials(): Promise<{ id: number, hash: string }>
// todo - this is only used for file dl/ul, which should probably be moved // todo - this is only used for file dl/ul, which should probably be moved

View file

@ -14,6 +14,8 @@ import { BaseTelegramClient, BaseTelegramClientOptions } from '../base.js'
// @copy // @copy
import { ITelegramClient } from '../client.types.js' import { ITelegramClient } from '../client.types.js'
// @copy // @copy
import { RawUpdateInfo } from '../updates/types.js'
// @copy
import { import {
AllStories, AllStories,
ArrayPaginated, ArrayPaginated,

View file

@ -12,7 +12,6 @@ import { Conversation } from '../types/conversation.js'
import type { ParsedUpdateHandlerParams } from '../updates/parsed.js' import type { ParsedUpdateHandlerParams } from '../updates/parsed.js'
// @copy // @copy
import { makeParsedUpdateHandler } from '../updates/parsed.js' import { makeParsedUpdateHandler } from '../updates/parsed.js'
// @copy // @copy
type TelegramClientOptions = ( type TelegramClientOptions = (
| (PartialOnly<Omit<BaseTelegramClientOptions, 'storage'>, 'transport' | 'crypto' | 'platform'> & { | (PartialOnly<Omit<BaseTelegramClientOptions, 'storage'>, 'transport' | 'crypto' | 'platform'> & {
@ -52,41 +51,35 @@ function _initializeClient(this: TelegramClient, opts: TelegramClientOptions) {
if ('client' in opts) { if ('client' in opts) {
this._client = opts.client this._client = opts.client
} else { } else {
if (!opts.storage || typeof opts.storage === 'string' || !opts.transport || !opts.crypto) { if (!opts.storage || typeof opts.storage === 'string' || !opts.transport || !opts.crypto || !opts.platform) {
throw new MtUnsupportedError( throw new MtUnsupportedError(
'You need to explicitly provide storage, transport and crypto for @mtcute/core', 'You need to explicitly provide storage, transport, crypto and platform for @mtcute/core',
) )
} }
this._client = new BaseTelegramClient(opts as BaseTelegramClientOptions) this._client = new BaseTelegramClient(opts as BaseTelegramClientOptions)
} }
// @ts-expect-error codegen Object.defineProperty(this, 'log', { value: this._client.log })
this.log = this._client.log Object.defineProperty(this, 'storage', { value: this._client.storage })
// @ts-expect-error codegen Object.defineProperty(this, 'stopSignal', { value: this._client.stopSignal })
this.storage = this._client.storage Object.defineProperty(this, 'appConfig', { value: this._client.appConfig })
Object.defineProperty(this, 'stopSignal', { Object.defineProperty(this, 'onServerUpdate', { value: this._client.onServerUpdate })
get: () => this._client.stopSignal, Object.defineProperty(this, 'onRawUpdate', { value: this._client.onServerUpdate })
}) Object.defineProperty(this, 'onConnectionState', { value: this._client.onConnectionState })
Object.defineProperty(this, 'appConfig', {
get: () => this._client.appConfig,
})
if (!opts.disableUpdates) { if (!opts.disableUpdates) {
const skipConversationUpdates = opts.skipConversationUpdates ?? true const skipConversationUpdates = opts.skipConversationUpdates ?? true
const { messageGroupingInterval } = opts.updates ?? {} const { messageGroupingInterval } = opts.updates ?? {}
this._client.onUpdate( this._client.onRawUpdate.add(
makeParsedUpdateHandler({ makeParsedUpdateHandler({
messageGroupingInterval, messageGroupingInterval,
onUpdate: (update) => { onUpdate: (update) => {
if (Conversation.handleUpdate(this, update) && skipConversationUpdates) return if (Conversation.handleUpdate(this, update) && skipConversationUpdates) return
this.emit('update', update) this.onUpdate.emit(update)
this.emit(update.name, update.data) // @generate-update-emitter
},
onRawUpdate: (update, peers) => {
this.emit('raw_update', update, peers)
}, },
}), }),
) )

View file

@ -1,8 +1,5 @@
import type { tl } from '@mtcute/tl'
import type { import type {
ParsedUpdate, ParsedUpdate,
PeersIndex,
} from '../index.js' } from '../index.js'
import { import {
BotChatJoinRequestUpdate, BotChatJoinRequestUpdate,
@ -30,9 +27,10 @@ import {
UserStatusUpdate, UserStatusUpdate,
UserTypingUpdate, UserTypingUpdate,
} from '../index.js' } from '../index.js'
import type { RawUpdateInfo } from '../../updates/types.js'
/** @internal */ /** @internal */
export function _parseUpdate(update: tl.TypeUpdate, peers: PeersIndex): ParsedUpdate | null { export function _parseUpdate({ update, peers }: RawUpdateInfo): ParsedUpdate | null {
switch (update._) { switch (update._) {
case 'updateNewMessage': case 'updateNewMessage':
case 'updateNewChannelMessage': case 'updateNewChannelMessage':

View file

@ -22,7 +22,7 @@ import type { CurrentUserInfo } from '../storage/service/current-user.js'
import { PeersIndex } from '../types/peers/peers-index.js' import { PeersIndex } from '../types/peers/peers-index.js'
import { _getChannelsBatched } from '../methods/chats/batched-queries.js' import { _getChannelsBatched } from '../methods/chats/batched-queries.js'
import type { PendingUpdate, PendingUpdateContainer, RawUpdateHandler, UpdatesManagerParams } from './types.js' import { type PendingUpdate, type PendingUpdateContainer, RawUpdateInfo, type UpdatesManagerParams } from './types.js'
import { import {
createDummyUpdatesContainer, createDummyUpdatesContainer,
extractChannelIdFromUpdate, extractChannelIdFromUpdate,
@ -146,7 +146,6 @@ export class UpdatesManager {
channelsOpened: Map<number, number> = new Map() channelsOpened: Map<number, number> = new Map()
log: Logger log: Logger
private _handler: RawUpdateHandler = () => {}
private _onCatchingUp: (catchingUp: boolean) => void = () => {} private _onCatchingUp: (catchingUp: boolean) => void = () => {}
@ -186,14 +185,6 @@ export class UpdatesManager {
} }
} }
setHandler(handler: RawUpdateHandler): void {
this._handler = handler
}
getHandler(): RawUpdateHandler {
return this._handler
}
onCatchingUp(handler: (catchingUp: boolean) => void): void { onCatchingUp(handler: (catchingUp: boolean) => void): void {
this._onCatchingUp = handler this._onCatchingUp = handler
} }
@ -1352,7 +1343,7 @@ export class UpdatesManager {
} }
log.debug('dispatching %s (postponed = %s)', upd._, postponed) log.debug('dispatching %s (postponed = %s)', upd._, postponed)
this._handler(upd, pending.peers) client.onRawUpdate.emit(new RawUpdateInfo(upd, pending.peers))
} }
async _loop(): Promise<void> { async _loop(): Promise<void> {

View file

@ -4,7 +4,7 @@ import type { Message } from '../types/messages/index.js'
import type { BusinessMessage, ParsedUpdate } from '../types/updates/index.js' import type { BusinessMessage, ParsedUpdate } from '../types/updates/index.js'
import { _parseUpdate } from '../types/updates/parse-update.js' import { _parseUpdate } from '../types/updates/parse-update.js'
import type { RawUpdateHandler } from './types.js' import type { RawUpdateInfo } from './types.js'
export interface ParsedUpdateHandlerParams { export interface ParsedUpdateHandlerParams {
/** /**
@ -29,32 +29,23 @@ export interface ParsedUpdateHandlerParams {
/** Handler for parsed updates */ /** Handler for parsed updates */
onUpdate: (update: ParsedUpdate) => void onUpdate: (update: ParsedUpdate) => void
/**
* Handler for raw updates.
*
* Note that this handler will be called **before** the parsed update handler.
*/
onRawUpdate?: RawUpdateHandler
} }
export function makeParsedUpdateHandler(params: ParsedUpdateHandlerParams): RawUpdateHandler { export function makeParsedUpdateHandler(params: ParsedUpdateHandlerParams): (update: RawUpdateInfo) => void {
const { messageGroupingInterval, onUpdate, onRawUpdate = () => {} } = params const { messageGroupingInterval, onUpdate } = params
if (!messageGroupingInterval) { if (!messageGroupingInterval) {
return (update, peers) => { return (info) => {
const parsed = _parseUpdate(update, peers) const parsed = _parseUpdate(info)
onRawUpdate(update, peers)
if (parsed) onUpdate(parsed) if (parsed) onUpdate(parsed)
} }
} }
const pending = new Map<string, [Message[], timers.Timer]>() const pending = new Map<string, [Message[], timers.Timer]>()
return (update, peers) => { return (info) => {
const parsed = _parseUpdate(update, peers) const parsed = _parseUpdate(info)
onRawUpdate(update, peers)
if (parsed) { if (parsed) {
if (parsed.name === 'new_message' || parsed.name === 'new_business_message') { if (parsed.name === 'new_message' || parsed.name === 'new_business_message') {

View file

@ -5,14 +5,15 @@ import type { EarlyTimer, Logger, SortedLinkedList } from '../../utils/index.js'
import type { CurrentUserInfo } from '../storage/service/current-user.js' import type { CurrentUserInfo } from '../storage/service/current-user.js'
import type { PeersIndex } from '../types/peers/peers-index.js' import type { PeersIndex } from '../types/peers/peers-index.js'
/** /** Information about a raw update. */
* Function to be called for each update. export class RawUpdateInfo {
* constructor(
* @param upd The update /** The update */
* @param peers Peers that are present in the update readonly update: tl.TypeUpdate,
*/ /** Peers that are present in the update */
export type RawUpdateHandler = (upd: tl.TypeUpdate, peers: PeersIndex) => void readonly peers: PeersIndex,
) {}
}
/** /**
* Parameters for the updates manager * Parameters for the updates manager
*/ */
@ -167,6 +168,5 @@ export interface UpdatesState {
log: Logger log: Logger
stop: () => void stop: () => void
handler: RawUpdateHandler
auth: CurrentUserInfo | null auth: CurrentUserInfo | null
} }

View file

@ -1,12 +1,10 @@
// eslint-disable-next-line unicorn/prefer-node-protocol
import EventEmitter from 'events'
import type { mtp } from '@mtcute/tl' import type { mtp } from '@mtcute/tl'
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
import { __tlReaderMap as defaultReaderMap } from '@mtcute/tl/binary/reader.js' import { __tlReaderMap as defaultReaderMap } from '@mtcute/tl/binary/reader.js'
import { __tlWriterMap as defaultWriterMap } from '@mtcute/tl/binary/writer.js' import { __tlWriterMap as defaultWriterMap } from '@mtcute/tl/binary/writer.js'
import type { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' import type { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
import type { ReconnectionStrategy } from '@fuman/net' import type { ReconnectionStrategy } from '@fuman/net'
import { Emitter } from '@fuman/utils'
import type { IMtStorageProvider } from '../storage/provider.js' import type { IMtStorageProvider } from '../storage/provider.js'
import type { StorageManagerExtraOptions } from '../storage/storage.js' import type { StorageManagerExtraOptions } from '../storage/storage.js'
@ -180,7 +178,7 @@ export interface MtClientOptions {
* to make RPC calls and receive low-level updates, as well as providing * to make RPC calls and receive low-level updates, as well as providing
* some APIs to manage that. * some APIs to manage that.
*/ */
export class MtClient extends EventEmitter { export class MtClient {
/** /**
* Crypto provider taken from {@link MtClientOptions.crypto} * Crypto provider taken from {@link MtClientOptions.crypto}
*/ */
@ -223,9 +221,12 @@ export class MtClient extends EventEmitter {
private _abortController: AbortController private _abortController: AbortController
readonly stopSignal: AbortSignal readonly stopSignal: AbortSignal
constructor(readonly params: MtClientOptions) { readonly onUsable: Emitter<void> = new Emitter()
super() readonly onConnecting: Emitter<void> = new Emitter()
readonly onNetworkChanged: Emitter<boolean> = new Emitter()
readonly onUpdate: Emitter<tl.TypeUpdates> = new Emitter()
constructor(readonly params: MtClientOptions) {
this.log = params.logger ?? new LogManager(undefined, params.platform) this.log = params.logger ?? new LogManager(undefined, params.platform)
if (params.logLevel !== undefined) { if (params.logLevel !== undefined) {
@ -281,10 +282,10 @@ export class MtClient extends EventEmitter {
isPremium: false, isPremium: false,
useIpv6: Boolean(params.useIpv6), useIpv6: Boolean(params.useIpv6),
enableErrorReporting: params.enableErrorReporting ?? false, enableErrorReporting: params.enableErrorReporting ?? false,
onUsable: () => this.emit('usable'), onUsable: this.onUsable.emit.bind(this.onUsable),
onConnecting: () => this.emit('connecting'), onConnecting: this.onConnecting.emit.bind(this.onConnecting),
onNetworkChanged: connected => this.emit('networkChanged', connected), onNetworkChanged: this.onNetworkChanged.emit.bind(this.onNetworkChanged),
onUpdate: upd => this.emit('update', upd), onUpdate: this.onUpdate.emit.bind(this.onUpdate),
stopSignal: this.stopSignal, stopSignal: this.stopSignal,
platform: params.platform, platform: params.platform,
...params.network, ...params.network,

View file

@ -1,8 +1,5 @@
// eslint-disable-next-line unicorn/prefer-node-protocol
import EventEmitter from 'events'
import type { mtp, tl } from '@mtcute/tl' import type { mtp, tl } from '@mtcute/tl'
import { Deferred } from '@fuman/utils' import { Deferred, Emitter } from '@fuman/utils'
import type { Logger } from '../utils/index.js' import type { Logger } from '../utils/index.js'
@ -11,18 +8,29 @@ import type { SessionConnectionParams } from './session-connection.js'
import { SessionConnection } from './session-connection.js' import { SessionConnection } from './session-connection.js'
import type { TelegramTransport } from './transports' import type { TelegramTransport } from './transports'
export class MultiSessionConnection extends EventEmitter { export class MultiSessionConnection {
private _log: Logger private _log: Logger
readonly _sessions: MtprotoSession[] readonly _sessions: MtprotoSession[]
private _enforcePfs = false private _enforcePfs = false
// NB: dont forget to update .reset()
readonly onRequestKeys: Emitter<Deferred<void>> = new Emitter()
readonly onError: Emitter<Error> = new Emitter()
readonly onUpdate: Emitter<tl.TypeUpdates> = new Emitter()
readonly onKeyChange: Emitter<[number, Uint8Array | null]> = new Emitter()
readonly onTmpKeyChange: Emitter<[number, Uint8Array | null, number]> = new Emitter()
readonly onFutureSalts: Emitter<mtp.RawMt_future_salt[]> = new Emitter()
readonly onAuthBegin: Emitter<number> = new Emitter()
readonly onUsable: Emitter<number> = new Emitter()
readonly onWait: Emitter<number> = new Emitter()
readonly onRequestAuth: Emitter<number> = new Emitter()
constructor( constructor(
readonly params: SessionConnectionParams, readonly params: SessionConnectionParams,
private _count: number, private _count: number,
log: Logger, log: Logger,
logPrefix = '', logPrefix = '',
) { ) {
super()
this._log = log.create('multi') this._log = log.create('multi')
if (logPrefix) this._log.prefix = `[${logPrefix}] ` if (logPrefix) this._log.prefix = `[${logPrefix}] `
this._enforcePfs = _count > 1 && params.isMainConnection this._enforcePfs = _count > 1 && params.isMainConnection
@ -119,7 +127,6 @@ export class MultiSessionConnection extends EventEmitter {
if (this._connections.length > this._count) { if (this._connections.length > this._count) {
// destroy extra connections // destroy extra connections
for (let i = this._connections.length - 1; i >= this._count; i--) { for (let i = this._connections.length - 1; i >= this._count; i--) {
this._connections[i].removeAllListeners()
this._connections[i].destroy().catch((err) => { this._connections[i].destroy().catch((err) => {
this._log.warn('error destroying connection: %e', err) this._log.warn('error destroying connection: %e', err)
}) })
@ -133,7 +140,7 @@ export class MultiSessionConnection extends EventEmitter {
if (enforcePfsChanged) { if (enforcePfsChanged) {
// we need to fetch new auth keys first // we need to fetch new auth keys first
const promise = new Deferred<void>() const promise = new Deferred<void>()
this.emit('request-keys', promise) this.onRequestKeys.emit(promise)
promise.promise promise.promise
.then(() => { .then(() => {
@ -144,7 +151,7 @@ export class MultiSessionConnection extends EventEmitter {
}) })
}) })
.catch((err) => { .catch((err) => {
this.emit('error', err) this.onError.emit(err)
}) })
} }
@ -164,11 +171,11 @@ export class MultiSessionConnection extends EventEmitter {
) )
if (this.params.isMainConnection && this.params.isMainDcConnection) { if (this.params.isMainConnection && this.params.isMainDcConnection) {
conn.on('update', update => this.emit('update', update)) conn.onUpdate.add(update => this.onUpdate.emit(update))
} }
conn.on('error', err => this.emit('error', err, conn)) conn.onError.add(err => this.onError.emit(err))
conn.on('key-change', (key) => { conn.onKeyChange.add((key) => {
this.emit('key-change', i, key) this.onKeyChange.emit([i, key])
// notify other connections // notify other connections
for (const conn_ of this._connections) { for (const conn_ of this._connections) {
@ -176,11 +183,13 @@ export class MultiSessionConnection extends EventEmitter {
conn_.onConnected() conn_.onConnected()
} }
}) })
conn.on('tmp-key-change', (key, expires) => this.emit('tmp-key-change', i, key, expires)) conn.onTmpKeyChange.add(
conn.on('future-salts', salts => this.emit('future-salts', salts)) event => this.onTmpKeyChange.emit(event === null ? [i, null, 0] : [i, event[0], event[1]]),
conn.on('auth-begin', () => { )
conn.onFutureSalts.add(salts => this.onFutureSalts.emit(salts))
conn.onAuthBegin.add(() => {
this._log.debug('received auth-begin from connection %d', i) this._log.debug('received auth-begin from connection %d', i)
this.emit('auth-begin', i) this.onAuthBegin.emit(i)
// we need to reset temp auth keys if there are any left // we need to reset temp auth keys if there are any left
@ -189,10 +198,10 @@ export class MultiSessionConnection extends EventEmitter {
if (conn_ !== conn) conn_.reconnect() if (conn_ !== conn) conn_.reconnect()
}) })
}) })
conn.on('usable', () => this.emit('usable', i)) conn.onUsable.add(() => this.onUsable.emit(i))
conn.on('wait', () => this.emit('wait', i)) conn.onWait.add(() => this.onWait.emit(i))
conn.on('request-auth', () => this.emit('request-auth', i)) conn.onRequestAuth.add(() => this.onRequestAuth.emit(i))
conn.on('flood-done', () => { conn.onFloodDone.add(() => {
this._log.debug('received flood-done from connection %d', i) this._log.debug('received flood-done from connection %d', i)
this._connections.forEach(it => it.flushWhenIdle()) this._connections.forEach(it => it.flushWhenIdle())
@ -208,7 +217,17 @@ export class MultiSessionConnection extends EventEmitter {
async destroy(): Promise<void> { async destroy(): Promise<void> {
await Promise.all(this._connections.map(conn => conn.destroy())) await Promise.all(this._connections.map(conn => conn.destroy()))
this._sessions.forEach(sess => sess.reset()) this._sessions.forEach(sess => sess.reset())
this.removeAllListeners()
this.onRequestKeys.clear()
this.onError.clear()
this.onUpdate.clear()
this.onKeyChange.clear()
this.onTmpKeyChange.clear()
this.onFutureSalts.clear()
this.onAuthBegin.clear()
this.onUsable.clear()
this.onWait.clear()
this.onRequestAuth.clear()
this._destroyed = true this._destroyed = true
} }

View file

@ -16,7 +16,7 @@ import type { ConfigManager } from './config-manager.js'
import { basic as defaultMiddlewares } from './middlewares/default.js' import { basic as defaultMiddlewares } from './middlewares/default.js'
import { MultiSessionConnection } from './multi-session-connection.js' import { MultiSessionConnection } from './multi-session-connection.js'
import { ServerSaltManager } from './server-salt.js' import { ServerSaltManager } from './server-salt.js'
import type { SessionConnection, SessionConnectionParams } from './session-connection.js' import type { SessionConnectionParams } from './session-connection.js'
import type { TelegramTransport } from './transports/abstract.js' import type { TelegramTransport } from './transports/abstract.js'
export type ConnectionKind = 'main' | 'upload' | 'download' | 'downloadSmall' export type ConnectionKind = 'main' | 'upload' | 'download' | 'downloadSmall'
@ -43,7 +43,7 @@ export interface NetworkManagerParams {
readerMap: TlReaderMap readerMap: TlReaderMap
writerMap: TlWriterMap writerMap: TlWriterMap
isPremium: boolean isPremium: boolean
emitError: (err: Error, connection?: SessionConnection) => void emitError: (err: Error) => void
onUpdate: (upd: tl.TypeUpdates) => void onUpdate: (upd: tl.TypeUpdates) => void
onUsable: () => void onUsable: () => void
onConnecting: () => void onConnecting: () => void
@ -314,7 +314,7 @@ export class DcConnectionManager {
private _setupMulti(kind: ConnectionKind): void { private _setupMulti(kind: ConnectionKind): void {
const connection = this[kind] const connection = this[kind]
connection.on('key-change', (idx, key: Uint8Array | null) => { connection.onKeyChange.add(([idx, key]) => {
if (kind !== 'main') { if (kind !== 'main') {
// main connection is responsible for authorization, // main connection is responsible for authorization,
// and keys are then sent to other connections // and keys are then sent to other connections
@ -340,7 +340,7 @@ export class DcConnectionManager {
this.manager.params.emitError(e) this.manager.params.emitError(e)
}) })
}) })
connection.on('tmp-key-change', (idx: number, key: Uint8Array | null, expires: number) => { connection.onTmpKeyChange.add(([idx, key, expires]) => {
if (kind !== 'main') { if (kind !== 'main') {
this.manager._log.warn('got tmp-key-change from non-main connection, ignoring') this.manager._log.warn('got tmp-key-change from non-main connection, ignoring')
@ -365,13 +365,13 @@ export class DcConnectionManager {
this.manager.params.emitError(e) this.manager.params.emitError(e)
}) })
}) })
connection.on('future-salts', (salts: mtp.RawMt_future_salt[]) => { connection.onFutureSalts.add((salts: mtp.RawMt_future_salt[]) => {
Promise.resolve(this.manager._storage.salts.store(this.dcId, salts)).catch((e: Error) => Promise.resolve(this.manager._storage.salts.store(this.dcId, salts)).catch((e: Error) =>
this.manager.params.emitError(e), this.manager.params.emitError(e),
) )
}) })
connection.on('auth-begin', () => { connection.onAuthBegin.add(() => {
// we need to propagate auth-begin to all connections // we need to propagate auth-begin to all connections
// to avoid them sending requests before auth is complete // to avoid them sending requests before auth is complete
if (kind !== 'main') { if (kind !== 'main') {
@ -387,19 +387,19 @@ export class DcConnectionManager {
this.downloadSmall.resetAuthKeys() this.downloadSmall.resetAuthKeys()
}) })
connection.on('request-auth', () => { connection.onRequestAuth.add(() => {
this.main.requestAuth() this.main.requestAuth()
}) })
// fucking awesome architecture, but whatever // fucking awesome architecture, but whatever
connection.on('request-keys', (promise: Deferred<void>) => { connection.onRequestKeys.add((promise: Deferred<void>) => {
this.loadKeys(true) this.loadKeys(true)
.then(() => promise.resolve()) .then(() => promise.resolve())
.catch((e: Error) => promise.reject(e)) .catch((e: Error) => promise.reject(e))
}) })
connection.on('error', (err: Error, conn: SessionConnection) => { connection.onError.add((err: Error) => {
this.manager.params.emitError(err, conn) this.manager.params.emitError(err)
}) })
} }
@ -562,15 +562,15 @@ export class NetworkManager {
this.params.onConnecting() this.params.onConnecting()
dc.main.on('usable', () => { dc.main.onUsable.add(() => {
if (dc !== this._primaryDc) return if (dc !== this._primaryDc) return
this.params.onUsable() this.params.onUsable()
}) })
dc.main.on('wait', () => { dc.main.onWait.add(() => {
if (dc !== this._primaryDc) return if (dc !== this._primaryDc) return
this.params.onConnecting() this.params.onConnecting()
}) })
dc.main.on('update', (update: tl.TypeUpdates) => { dc.main.onUpdate.add((update: tl.TypeUpdates) => {
this._updateHandler(update, false) this._updateHandler(update, false)
}) })

View file

@ -1,10 +1,7 @@
// eslint-disable-next-line unicorn/prefer-node-protocol
import EventEmitter from 'events'
import type { ReconnectionStrategy } from '@fuman/net' import type { ReconnectionStrategy } from '@fuman/net'
import { PersistentConnection as FumanPersistentConnection } from '@fuman/net' import { PersistentConnection as FumanPersistentConnection } from '@fuman/net'
import { FramedReader, FramedWriter } from '@fuman/io' import { FramedReader, FramedWriter } from '@fuman/io'
import { timers } from '@fuman/utils' import { Emitter, timers } from '@fuman/utils'
import type { BasicDcOption, ICryptoProvider, Logger } from '../utils/index.js' import type { BasicDcOption, ICryptoProvider, Logger } from '../utils/index.js'
@ -26,7 +23,7 @@ let nextConnectionUid = 0
* Only used for {@link PersistentConnection} and used as a mean of code splitting. * Only used for {@link PersistentConnection} and used as a mean of code splitting.
* This class doesn't know anything about MTProto, it just manages the transport. * This class doesn't know anything about MTProto, it just manages the transport.
*/ */
export abstract class PersistentConnection extends EventEmitter { export abstract class PersistentConnection {
private _uid = nextConnectionUid++ private _uid = nextConnectionUid++
readonly params: PersistentConnectionParams readonly params: PersistentConnectionParams
@ -45,10 +42,13 @@ export abstract class PersistentConnection extends EventEmitter {
_destroyed = false _destroyed = false
_usable = false _usable = false
readonly onWait: Emitter<number> = new Emitter()
readonly onUsable: Emitter<void> = new Emitter()
readonly onError: Emitter<Error> = new Emitter()
protected abstract onConnected(): void protected abstract onConnected(): void
protected abstract onClosed(): void protected abstract onClosed(): void
protected abstract handleError(err: Error): void
protected abstract onError(err: Error): void
protected abstract onMessage(data: Uint8Array): void protected abstract onMessage(data: Uint8Array): void
@ -56,7 +56,6 @@ export abstract class PersistentConnection extends EventEmitter {
params: PersistentConnectionParams, params: PersistentConnectionParams,
readonly log: Logger, readonly log: Logger,
) { ) {
super()
this.params = params this.params = params
this.params.transport.setup?.(this.params.crypto, log) this.params.transport.setup?.(this.params.crypto, log)
@ -76,7 +75,7 @@ export abstract class PersistentConnection extends EventEmitter {
onWait: (wait) => { onWait: (wait) => {
this._updateLogPrefix() this._updateLogPrefix()
this.log.debug('waiting for %d ms before reconnecting', wait) this.log.debug('waiting for %d ms before reconnecting', wait)
this.emit('wait', wait) this.onWait.emit(wait)
}, },
}) })
@ -125,7 +124,7 @@ export abstract class PersistentConnection extends EventEmitter {
} }
this._rescheduleInactivity() this._rescheduleInactivity()
this.emit('usable') // is this needed? this.onUsable.emit() // is this needed?
this.onConnected() this.onConnected()
while (true) { while (true) {
@ -148,7 +147,7 @@ export abstract class PersistentConnection extends EventEmitter {
private async _onError(err: Error) { private async _onError(err: Error) {
this._updateLogPrefix() this._updateLogPrefix()
this.onError(err) this.handleError(err)
return 'reconnect' as const return 'reconnect' as const
} }

View file

@ -3,7 +3,7 @@ import type { mtp } from '@mtcute/tl'
import { tl } from '@mtcute/tl' import { tl } from '@mtcute/tl'
import type { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime' import type { TlReaderMap, TlWriterMap } from '@mtcute/tl-runtime'
import { TlBinaryReader, TlBinaryWriter, TlSerializationCounter } from '@mtcute/tl-runtime' import { TlBinaryReader, TlBinaryWriter, TlSerializationCounter } from '@mtcute/tl-runtime'
import { Deferred, u8 } from '@fuman/utils' import { Deferred, Emitter, u8 } from '@fuman/utils'
import { MtArgumentError, MtTimeoutError, MtcuteError } from '../types/index.js' import { MtArgumentError, MtTimeoutError, MtcuteError } from '../types/index.js'
import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js' import { createAesIgeForMessageOld } from '../utils/crypto/mtproto.js'
@ -88,6 +88,16 @@ export class SessionConnection extends PersistentConnection {
// https://github.com/tdlib/td/blob/91aa6c9e4d0774eabf4f8d7f3aa51239032059a6/td/mtproto/SessionConnection.h // https://github.com/tdlib/td/blob/91aa6c9e4d0774eabf4f8d7f3aa51239032059a6/td/mtproto/SessionConnection.h
private _pingInterval: number private _pingInterval: number
// NB: dont forget to update .reset()
readonly onDisconnect: Emitter<void> = new Emitter()
readonly onKeyChange: Emitter<Uint8Array | null> = new Emitter()
readonly onTmpKeyChange: Emitter<[Uint8Array, number] | null> = new Emitter()
readonly onFloodDone: Emitter<void> = new Emitter()
readonly onRequestAuth: Emitter<void> = new Emitter()
readonly onAuthBegin: Emitter<void> = new Emitter()
readonly onUpdate: Emitter<tl.TypeUpdates> = new Emitter()
readonly onFutureSalts: Emitter<mtp.RawMt_future_salt[]> = new Emitter()
constructor( constructor(
params: SessionConnectionParams, params: SessionConnectionParams,
readonly _session: MtprotoSession, readonly _session: MtprotoSession,
@ -148,7 +158,7 @@ export class SessionConnection extends PersistentConnection {
} }
} }
this.emit('disconnect') this.onDisconnect.emit()
this.reset() this.reset()
} }
@ -165,8 +175,18 @@ export class SessionConnection extends PersistentConnection {
if (forever) { if (forever) {
timers.clearTimeout(this._pfsUpdateTimeout) timers.clearTimeout(this._pfsUpdateTimeout)
this.removeAllListeners() this.onDisconnect.clear()
this.on('error', (err) => { this.onKeyChange.clear()
this.onTmpKeyChange.clear()
this.onFloodDone.clear()
this.onRequestAuth.clear()
this.onAuthBegin.clear()
this.onUpdate.clear()
this.onFutureSalts.clear()
this.onWait.clear()
this.onUsable.clear()
this.onError.clear()
this.onError.add((err) => {
this.log.warn('caught error after destroying: %s', err) this.log.warn('caught error after destroying: %s', err)
}) })
} }
@ -202,7 +222,7 @@ export class SessionConnection extends PersistentConnection {
this.onConnectionUsable() this.onConnectionUsable()
} }
protected onError(error: Error): void { protected handleError(error: Error): void {
// https://core.telegram.org/mtproto/mtproto-_transports#_transport-errors // https://core.telegram.org/mtproto/mtproto-_transports#_transport-errors
if (error instanceof TransportError) { if (error instanceof TransportError) {
if (error.code === 404) { if (error.code === 404) {
@ -241,8 +261,8 @@ export class SessionConnection extends PersistentConnection {
this.log.info('transport error 404, reauthorizing') this.log.info('transport error 404, reauthorizing')
this._session.resetAuthKey() this._session.resetAuthKey()
this._resetSession() this._resetSession()
this.emit('key-change', null) this.onKeyChange.emit(null)
this.emit('error', error) this.onError.emit(error)
return return
} }
@ -252,13 +272,13 @@ export class SessionConnection extends PersistentConnection {
this._onAllFailed(`transport error ${error.code}`) this._onAllFailed(`transport error ${error.code}`)
if (error.code === 429) { if (error.code === 429) {
this._session.onTransportFlood(this.emit.bind(this, 'flood-done')) this._session.onTransportFlood(() => this.onFloodDone.emit())
return return
} }
} }
this.emit('error', error) this.onError.emit(error)
} }
protected onConnectionUsable(): void { protected onConnectionUsable(): void {
@ -285,13 +305,13 @@ export class SessionConnection extends PersistentConnection {
if (!this.params.isMainConnection) { if (!this.params.isMainConnection) {
// we don't authorize on non-main connections // we don't authorize on non-main connections
this.log.debug('_authorize(): non-main connection, requesting...') this.log.debug('_authorize(): non-main connection, requesting...')
this.emit('request-auth') this.onRequestAuth.emit()
return return
} }
this._session.authorizationPending = true this._session.authorizationPending = true
this.emit('auth-begin') this.onAuthBegin.emit()
doAuthorization(this, this._crypto) doAuthorization(this, this._crypto)
.then(([authKey, serverSalt, timeOffset]) => { .then(([authKey, serverSalt, timeOffset]) => {
@ -301,7 +321,7 @@ export class SessionConnection extends PersistentConnection {
this._session.authorizationPending = false this._session.authorizationPending = false
this.emit('key-change', authKey) this.onKeyChange.emit(authKey)
if (this._usePfs) { if (this._usePfs) {
return this._authorizePfs() return this._authorizePfs()
@ -312,7 +332,7 @@ export class SessionConnection extends PersistentConnection {
this._session.authorizationPending = false this._session.authorizationPending = false
if (this._destroyed) return if (this._destroyed) return
this.log.error('Authorization error: %e', err) this.log.error('Authorization error: %e', err)
this.onError(err) this.handleError(err)
this.reconnect() this.reconnect()
}) })
} }
@ -464,7 +484,7 @@ export class SessionConnection extends PersistentConnection {
// we must re-init connection after binding temp key // we must re-init connection after binding temp key
this._session.initConnectionCalled = false this._session.initConnectionCalled = false
this.emit('tmp-key-change', tempAuthKey, inner.expiresAt) this.onTmpKeyChange.emit([tempAuthKey, inner.expiresAt])
this.onConnectionUsable() this.onConnectionUsable()
// set a timeout to update temp auth key in advance to avoid interruption // set a timeout to update temp auth key in advance to avoid interruption
@ -489,7 +509,7 @@ export class SessionConnection extends PersistentConnection {
} }
this._isPfsBindingPending = false this._isPfsBindingPending = false
this.onError(err) this.handleError(err)
this.reconnect() this.reconnect()
}) })
} }
@ -661,7 +681,7 @@ export class SessionConnection extends PersistentConnection {
break break
} }
this.emit('update', message) this.onUpdate.emit(message)
return return
} }
@ -1145,7 +1165,7 @@ export class SessionConnection extends PersistentConnection {
// force the client to fetch missed updates // force the client to fetch missed updates
// when _lastSessionCreatedUid == 0, the connection has // when _lastSessionCreatedUid == 0, the connection has
// just been established, and the client will fetch them anyways // just been established, and the client will fetch them anyways
this.emit('update', { _: 'updatesTooLong' }) this.onUpdate.emit({ _: 'updatesTooLong' })
} }
this._salts.currentSalt = serverSalt this._salts.currentSalt = serverSalt
@ -1287,7 +1307,7 @@ export class SessionConnection extends PersistentConnection {
this._salts.isFetching = false this._salts.isFetching = false
this._salts.setFutureSalts(msg.salts.slice()) this._salts.setFutureSalts(msg.salts.slice())
this.emit('future-salts', msg.salts) this.onFutureSalts.emit(msg.salts)
} }
private _onDestroySessionResult(msg: mtp.TypeDestroySessionRes): void { private _onDestroySessionResult(msg: mtp.TypeDestroySessionRes): void {

View file

@ -19,6 +19,7 @@ import type {
PeersIndex, PeersIndex,
PollUpdate, PollUpdate,
PollVoteUpdate, PollVoteUpdate,
RawUpdateInfo,
StoryUpdate, StoryUpdate,
UserStatusUpdate, UserStatusUpdate,
UserTypingUpdate, UserTypingUpdate,
@ -270,8 +271,8 @@ export class Dispatcher<State extends object = never> {
* Dispatcher also uses bound client to throw errors * Dispatcher also uses bound client to throw errors
*/ */
bindToClient(client: TelegramClient): void { bindToClient(client: TelegramClient): void {
client.on('update', this.dispatchUpdate) client.onUpdate.add(this.dispatchUpdate)
client.on('raw_update', this.dispatchRawUpdate) client.onRawUpdate.add(this.dispatchRawUpdate)
this._client = client this._client = client
} }
@ -281,8 +282,8 @@ export class Dispatcher<State extends object = never> {
*/ */
unbind(): void { unbind(): void {
if (this._client) { if (this._client) {
this._client.off('update', this.dispatchUpdate) this._client.onUpdate.remove(this.dispatchUpdate)
this._client.off('raw_update', this.dispatchRawUpdate) this._client.onRawUpdate.remove(this.dispatchRawUpdate)
this._client = undefined this._client = undefined
} }
@ -322,7 +323,7 @@ export class Dispatcher<State extends object = never> {
* @param update Update to process * @param update Update to process
* @param peers Peers index * @param peers Peers index
*/ */
dispatchRawUpdate(update: tl.TypeUpdate | tl.TypeMessage, peers: PeersIndex): void { dispatchRawUpdate({ update, peers }: RawUpdateInfo): void {
if (!this._client) return if (!this._client) return
// order does not matter in the dispatcher, // order does not matter in the dispatcher,

View file

@ -17,9 +17,6 @@ importers:
'@fuman/jsr': '@fuman/jsr':
specifier: workspace:^ specifier: workspace:^
version: link:private/fuman/packages/jsr version: link:private/fuman/packages/jsr
'@fuman/utils':
specifier: workspace:^
version: link:private/fuman/packages/utils
'@types/deno': '@types/deno':
specifier: npm:@teidesu/deno-types@1.46.3 specifier: npm:@teidesu/deno-types@1.46.3
version: '@teidesu/deno-types@1.46.3' version: '@teidesu/deno-types@1.46.3'