diff --git a/packages/core/src/highlevel/methods/files/download-stream.ts b/packages/core/src/highlevel/methods/files/download-stream.ts index f4fdf277..54743072 100644 --- a/packages/core/src/highlevel/methods/files/download-stream.ts +++ b/packages/core/src/highlevel/methods/files/download-stream.ts @@ -1,12 +1,11 @@ import type { ITelegramClient } from '../../client.types.js' import type { FileDownloadLocation, FileDownloadParameters } from '../../types/index.js' import { FileLocation } from '../../types/index.js' -import { bufferToStream } from '../../utils/stream-utils.js' import { downloadAsIterable } from './download-iterable.js' /** - * Download a file and return it as a readable stream, + * Download a file and return it as a `@fuman/io` stream, * streaming file contents. * * @param params File download parameters @@ -17,7 +16,13 @@ export function downloadAsStream( params?: FileDownloadParameters, ): ReadableStream { if (location instanceof FileLocation && ArrayBuffer.isView(location.location)) { - return bufferToStream(location.location) + const buf = location.location + return new ReadableStream({ + start(controller) { + controller.enqueue(buf) + controller.close() + }, + }) } const cancel = new AbortController() diff --git a/packages/core/src/highlevel/methods/files/upload-file.ts b/packages/core/src/highlevel/methods/files/upload-file.ts index 2b585cfe..4c633120 100644 --- a/packages/core/src/highlevel/methods/files/upload-file.ts +++ b/packages/core/src/highlevel/methods/files/upload-file.ts @@ -1,6 +1,6 @@ import type { tl } from '@mtcute/tl' import type { IReadable } from '@fuman/io' -import { read, webReadableToFuman } from '@fuman/io' +import { Bytes, read, webReadableToFuman } from '@fuman/io' import { AsyncLock } from '@fuman/utils' import { MtArgumentError } from '../../../types/errors.js' @@ -9,7 +9,6 @@ import type { ITelegramClient } from '../../client.types.js' import type { UploadFileLike, UploadedFile } from '../../types/index.js' import { MIME_TO_EXTENSION, guessFileMime } from '../../utils/file-type.js' import { determinePartSize, isProbablyPlainText } from '../../utils/file-utils.js' -import { bufferToStream } from '../../utils/stream-utils.js' const OVERRIDE_MIME: Record = { // tg doesn't interpret `audio/opus` files as voice messages for some reason @@ -131,7 +130,7 @@ export async function uploadFile( if (ArrayBuffer.isView(file)) { fileSize = file.length - file = bufferToStream(file) + file = Bytes.from(file) } if (HAS_FILE && file instanceof File) { @@ -190,7 +189,7 @@ export async function uploadFile( throw new MtArgumentError('Could not convert input `file` to stream!') } - const readable = file as IReadable + let readable = file as IReadable // set file size if not automatically inferred if (fileSize === -1 && params.fileSize) fileSize = params.fileSize @@ -199,7 +198,7 @@ export async function uploadFile( // buffer the entire stream in memory, then convert it back to stream (bruh) const buffer = await read.async.untilEnd(readable) fileSize = buffer.length - file = bufferToStream(buffer) + readable = Bytes.from(buffer) } let partSizeKb = params.partSize @@ -245,11 +244,18 @@ export async function uploadFile( let pos = 0 let idx = 0 + let readableEnded = false const uploadNextPart = async (): Promise => { const thisIdx = idx++ await lock.acquire() + + if (readableEnded) { + lock.release() + return + } + let part try { part = await read.async.exactly(readable, partSize, 'truncate') @@ -258,14 +264,16 @@ export async function uploadFile( } const ended = part.length < partSize + if (ended) { + readableEnded = true - if (ended && fileSize !== -1 && thisIdx !== partCount - 1) { - throw new MtArgumentError(`Unexpected EOS (there were only ${idx} parts, but expected ${partCount})`) - } + if (fileSize === -1) { + fileSize = pos + part.length + partCount = ~~((fileSize + partSize - 1) / partSize) + } else if (thisIdx !== partCount - 1) { + throw new MtArgumentError(`Unexpected EOS (there were only ${idx} parts, but expected ${partCount})`) + } - if (ended && fileSize === -1) { - fileSize = pos + part.length - partCount = ~~((fileSize + partSize - 1) / partSize) client.log.debug('readable ended, file size = %d, part count = %d', fileSize, partCount) } diff --git a/packages/core/src/highlevel/utils/stream-utils.ts b/packages/core/src/highlevel/utils/stream-utils.ts index e8c11cbb..47f42a06 100644 --- a/packages/core/src/highlevel/utils/stream-utils.ts +++ b/packages/core/src/highlevel/utils/stream-utils.ts @@ -1,14 +1,5 @@ import { u8 } from '@fuman/utils' -export function bufferToStream(buf: Uint8Array): ReadableStream { - return new ReadableStream({ - start(controller) { - controller.enqueue(buf) - controller.close() - }, - }) -} - export async function streamToBuffer(stream: ReadableStream): Promise { const chunks: Uint8Array[] = []