From b9b2b9f6baef022347cb8c47c9ff658bcc668db8 Mon Sep 17 00:00:00 2001 From: teidesu <86301490+teidesu@users.noreply.github.com> Date: Sat, 10 Jul 2021 15:10:43 +0300 Subject: [PATCH] feat(dispatcher): Conversation class --- packages/dispatcher/src/conversation.ts | 516 ++++++++++++++++++++++++ packages/dispatcher/src/index.ts | 1 + 2 files changed, 517 insertions(+) create mode 100644 packages/dispatcher/src/conversation.ts diff --git a/packages/dispatcher/src/conversation.ts b/packages/dispatcher/src/conversation.ts new file mode 100644 index 00000000..18aaf907 --- /dev/null +++ b/packages/dispatcher/src/conversation.ts @@ -0,0 +1,516 @@ +import { Dispatcher } from './dispatcher' +import { + FormattedString, + InputMediaLike, + InputPeerLike, + MaybeAsync, + Message, + MtCuteArgumentError, + TelegramClient, + TimeoutError, + tl, +} from '@mtcute/client' +import { AsyncLock, getMarkedPeerId } from '@mtcute/core' +import { + ControllablePromise, + createControllablePromise, +} from '@mtcute/core/src/utils/controllable-promise' +import { HistoryReadUpdate } from './updates' + +interface OneWayLinkedListItem { + v: T + n?: OneWayLinkedListItem +} + +class Queue { + first?: OneWayLinkedListItem + last?: OneWayLinkedListItem + + length = 0 + + constructor (readonly limit = 0) { + } + + push(item: T): void { + const it: OneWayLinkedListItem = { v: item } + if (!this.first) { + this.first = this.last = it + } else { + this.last!.n = it + this.last = it + } + + this.length += 1 + + if (this.limit) { + while (this.first && this.length > this.limit) { + this.first = this.first.n + this.length -= 1 + } + } + } + + empty(): boolean { + return this.first === undefined + } + + peek(): T | undefined { + return this.first?.v + } + + pop(): T | undefined { + if (!this.first) return undefined + + const it = this.first + this.first = this.first.n + if (!this.first) this.last = undefined + + this.length -= 1 + return it.v + } + + removeBy(pred: (it: T) => boolean): void { + if (!this.first) return + + let prev: OneWayLinkedListItem | undefined = undefined + let it = this.first + while (it && !pred(it.v)) { + if (!it.n) return + + prev = it + it = it.n + } + + if (!it) return + + if (prev) { + prev.n = it.n + } else { + this.first = it.n + } + + if (!this.first) this.last = undefined + + this.length -= 1 + } + + clear(): void { + this.first = this.last = undefined + this.length = 0 + } +} + +interface QueuedHandler { + promise: ControllablePromise + check?: (update: T) => MaybeAsync + timeout?: NodeJS.Timeout +} + +export class Conversation { + private _inputPeer: tl.TypeInputPeer + private _chatId: number + private _client: TelegramClient + private _started = false + + private _lastMessage?: number + private _lastReceivedMessage?: number + + private _queuedNewMessage = new Queue>() + private _pendingNewMessages = new Queue() + private _lock = new AsyncLock() + + private _pendingEditMessage: Record> = {} + private _recentEdits = new Queue(10) + + private _pendingRead: Record> = {} + + constructor( + readonly dispatcher: Dispatcher, + readonly chat: InputPeerLike + ) { + this._onNewMessage = this._onNewMessage.bind(this) + this._onEditMessage = this._onEditMessage.bind(this) + this._onHistoryRead = this._onHistoryRead.bind(this) + } + + get inputPeer(): tl.TypeInputPeer { + if (!this._started) { + throw new MtCuteArgumentError("Conversation hasn't started yet") + } + + return this._inputPeer + } + + async start(): Promise { + if (this._started) return + + const client = this.dispatcher['_client'] + if (!client) { + throw new MtCuteArgumentError( + 'Dispatcher is not bound to a client!' + ) + } + + this._client = client + this._started = true + this._inputPeer = await client.resolvePeer(this.chat) + this._chatId = getMarkedPeerId(this._inputPeer) + + this.dispatcher.on('new_message', this._onNewMessage) + this.dispatcher.on('edit_message', this._onEditMessage) + this.dispatcher.on('history_read', this._onHistoryRead) + } + + stop(): void { + if (!this._started) return + + this.dispatcher.off('new_message', this._onNewMessage) + this.dispatcher.off('edit_message', this._onEditMessage) + this.dispatcher.off('history_read', this._onHistoryRead) + + // reset pending status + this._queuedNewMessage.clear() + this._pendingNewMessages.clear() + this._pendingEditMessage = {} + this._recentEdits.clear() + this._pendingRead = {} + + this._started = false + } + + /** + * Send a text message to this conversation. + * + * @param text Text of the message + * @param params + */ + async sendText( + text: string | FormattedString, + params?: Parameters[2] + ): ReturnType { + if (!this._started) { + throw new MtCuteArgumentError("Conversation hasn't started yet") + } + + const res = await this._client.sendText(this._inputPeer, text, params) + this._lastMessage = res.id + return res + } + + /** + * Send a media to this conversation. + * + * @param media Media to send + * @param params + */ + async sendMedia( + media: InputMediaLike | string, + params?: Parameters[2] + ): ReturnType { + if (!this._started) { + throw new MtCuteArgumentError("Conversation hasn't started yet") + } + + const res = await this._client.sendMedia(this._inputPeer, media, params) + this._lastMessage = res.id + return res + } + + /** + * Send a media group to this conversation. + * + * @param medias Medias to send + * @param params + */ + async sendMediaGroup( + medias: (InputMediaLike | string)[], + params?: Parameters[2] + ): ReturnType { + if (!this._started) { + throw new MtCuteArgumentError("Conversation hasn't started yet") + } + + const res = await this._client.sendMediaGroup( + this._inputPeer, + medias, + params + ) + this._lastMessage = res[res.length - 1].id + return res + } + + /** + * Mark the conversation as read up to a certain point. + * + * By default, reads until the last message. + * You can pass `null` to read the entire conversation, + * or pass message ID to read up until that ID. + */ + markRead(message?: number | null, clearMentions = true): Promise { + if (!this._started) { + throw new MtCuteArgumentError("Conversation hasn't started yet") + } + + if (message === null) { + message = 0 + } else if (message === undefined) { + message = this._lastMessage ?? 0 + } + + return this._client.readHistory(this._inputPeer, message, clearMentions) + } + + async with(handler: () => MaybeAsync): Promise { + await this.start() + + let err: unknown + let res: T + try { + res = await handler() + } catch (e) { + err = e + } + + this.stop() + + if (err) throw err + + return res! + } + + /** + * Wait for a new message in the conversation + * + * @param filter Filter for the handler. You can use any filter you can use for dispatcher + * @param timeout Timeout for the handler in ms. Pass `null` to disable. When the timeout is reached, `TimeoutError` is thrown. + */ + waitForNewMessage( + filter?: (msg: Message) => MaybeAsync, + timeout: number | null = 15000 + ): Promise { + if (!this._started) { + throw new MtCuteArgumentError("Conversation hasn't started yet") + } + + const promise = createControllablePromise() + + let timer: NodeJS.Timeout | undefined = undefined + if (timeout !== null) { + timer = setTimeout(() => { + promise.reject(new TimeoutError()) + this._queuedNewMessage.removeBy((it) => it.promise === promise) + }, timeout) + } + + this._queuedNewMessage.push({ + promise, + check: filter, + timeout: timer, + }) + + this._processPendingNewMessages() + + return promise + } + + waitForResponse( + filter?: (msg: Message) => MaybeAsync, + params?: { + message?: number + timeout?: number | null + } + ): Promise { + const msgId = params?.message ?? this._lastMessage ?? 0 + + const pred = filter + ? (msg: Message) => (msg.id > msgId ? filter(msg) : false) + : (msg: Message) => msg.id > msgId + + return this.waitForNewMessage(pred, params?.timeout) + } + + waitForReply( + filter?: (msg: Message) => MaybeAsync, + params?: { + message?: number + timeout?: number | null + } + ): Promise { + const msgId = params?.message ?? this._lastMessage + if (!msgId) + throw new MtCuteArgumentError( + 'Provide message for which to wait for reply for' + ) + + const pred = filter + ? (msg: Message) => + msg.replyToMessageId === msgId ? filter(msg) : false + : (msg: Message) => msg.replyToMessageId === msgId + + return this.waitForNewMessage(pred, params?.timeout) + } + + /** + * Wait for a message to be edited in the conversation. + * By defaults wait for the last message sent by the other party + * (at the moment) to be edited. + * + * Returns the edited message. + * + * @param filter Filter for the handler. You can use any filter you can use for dispatcher + * @param params + */ + async waitForEdit( + filter?: (msg: Message) => MaybeAsync, + params?: { + message?: number + timeout?: number | null + } + ): Promise { + if (!this._started) { + throw new MtCuteArgumentError("Conversation hasn't started yet") + } + + const msgId = params?.message ?? this._lastReceivedMessage + if (!msgId) { + throw new MtCuteArgumentError( + 'Provide message for which to wait for edit for' + ) + } + + const promise = createControllablePromise() + + let timer: NodeJS.Timeout | undefined = undefined + if (params?.timeout !== null) { + timer = setTimeout(() => { + promise.reject(new TimeoutError()) + delete this._pendingEditMessage[msgId] + }, params?.timeout ?? 15000) + } + + this._pendingEditMessage[msgId] = { + promise, + check: filter, + timeout: timer, + } + + this._processRecentEdits() + + return promise + } + + async waitForRead(message?: number, timeout: number | null = 15000): Promise { + if (!this._started) { + throw new MtCuteArgumentError("Conversation hasn't started yet") + } + + const msgId = message ?? this._lastMessage + if (!msgId) + throw new MtCuteArgumentError( + 'Provide message for which to wait for reply for' + ) + + // check if the message is already read + const dialog = await this._client.getPeerDialogs(this._inputPeer) + if (dialog.lastRead >= msgId) return + + const promise = createControllablePromise() + + let timer: NodeJS.Timeout | undefined = undefined + if (timeout !== null) { + timer = setTimeout(() => { + promise.reject(new TimeoutError()) + delete this._pendingRead[msgId] + }, timeout) + } + + this._pendingRead[msgId] = { + promise, + timeout: timer, + } + + return promise + } + + + private _onNewMessage(msg: Message) { + if (msg.chat.id !== this._chatId) return + + if (this._queuedNewMessage.empty()) { + this._pendingNewMessages.push(msg) + return + } + + const it = this._queuedNewMessage.peek()! + + // order does matter for new messages + this._lock.acquire().then(async () => { + try { + if (!it.check || (await it.check(msg))) { + if (it.timeout) clearTimeout(it.timeout) + it.promise.resolve(msg) + this._queuedNewMessage.pop() + } + } catch (e) { + this._client['_emitError'](e) + } + + this._lastMessage = this._lastReceivedMessage = msg.id + + // this func will *never* error, so no need to use .then + this._lock.release() + }) + } + + private _onEditMessage(msg: Message, fromRecent = false) { + if (msg.chat.id !== this._chatId) return + + const it = this._pendingEditMessage[msg.id] + if (!it && !fromRecent) { + this._recentEdits.push(msg) + return + } + + (async () => { + if (!it.check || (await it.check(msg))) { + if (it.timeout) clearTimeout(it.timeout) + it.promise.resolve(msg) + delete this._pendingEditMessage[msg.id] + } + })().catch((e) => this._client['_emitError'](e)) + } + + private _onHistoryRead(upd: HistoryReadUpdate) { + if (upd.chatId !== this._chatId) return + + const lastRead = upd.maxReadId + + Object.keys(this._pendingRead).forEach((msgId: any) => { + if (parseInt(msgId) <= lastRead) { + const it = this._pendingRead[msgId] + if (it.timeout) clearTimeout(it.timeout) + it.promise.resolve() + delete this._pendingRead[msgId] + } + }) + } + + private _processPendingNewMessages() { + if (this._pendingNewMessages.empty()) return + + let it + while ((it = this._pendingNewMessages.pop())) { + this._onNewMessage(it) + } + } + + private _processRecentEdits() { + if (this._recentEdits.empty()) return + + let it = this._recentEdits.first + do { + if (!it) break + this._onEditMessage(it.v, true) + } while ((it = it.n)) + } +} diff --git a/packages/dispatcher/src/index.ts b/packages/dispatcher/src/index.ts index a71cff97..c9c0a557 100644 --- a/packages/dispatcher/src/index.ts +++ b/packages/dispatcher/src/index.ts @@ -6,5 +6,6 @@ export * from './propagation' export * from './updates' export * from './wizard' export * from './callback-data-builder' +export * from './conversation' export { UpdateState, IStateStorage } from './state'