diff --git a/packages/core/src/highlevel/methods/files/upload-file.ts b/packages/core/src/highlevel/methods/files/upload-file.ts index 8ed1d269..113d2a40 100644 --- a/packages/core/src/highlevel/methods/files/upload-file.ts +++ b/packages/core/src/highlevel/methods/files/upload-file.ts @@ -1,4 +1,7 @@ import type { tl } from '@mtcute/tl' +import type { IReadable } from '@fuman/io' +import { read } from '@fuman/io' +import { AsyncLock } from '@fuman/utils' import { getPlatform } from '../../../platform.js' import { MtArgumentError } from '../../../types/errors.js' @@ -7,7 +10,7 @@ import type { ITelegramClient } from '../../client.types.js' import type { UploadFileLike, UploadedFile } from '../../types/index.js' import { MIME_TO_EXTENSION, guessFileMime } from '../../utils/file-type.js' import { determinePartSize, isProbablyPlainText } from '../../utils/file-utils.js' -import { bufferToStream, createChunkedReader, streamToBuffer } from '../../utils/stream-utils.js' +import { bufferToStream } from '../../utils/stream-utils.js' const OVERRIDE_MIME: Record = { // tg doesn't interpret `audio/opus` files as voice messages for some reason @@ -174,16 +177,20 @@ export async function uploadFile( file = file.body } - if (!(file instanceof ReadableStream)) { + if (file instanceof ReadableStream) { + file = read.async.fromWeb(file) + } else if (!(typeof file === 'object' && 'read' in file)) { // IReadable throw new MtArgumentError('Could not convert input `file` to stream!') } + const readable = file as IReadable + // set file size if not automatically inferred if (fileSize === -1 && params.fileSize) fileSize = params.fileSize if (fileSize === -1 && params.requireFileSize) { // buffer the entire stream in memory, then convert it back to stream (bruh) - const buffer = await streamToBuffer(file) + const buffer = await read.async.untilEnd(readable) fileSize = buffer.length file = bufferToStream(buffer) } @@ -227,35 +234,34 @@ export async function uploadFile( ) const fileId = randomLong() - const stream = file + const lock = new AsyncLock() let pos = 0 let idx = 0 - const reader = createChunkedReader(stream, partSize) const uploadNextPart = async (): Promise => { const thisIdx = idx++ - let part = await reader.read() - - if (!part && fileSize !== -1) { - throw new MtArgumentError(`Unexpected EOS (there were only ${idx - 1} parts, but expected ${partCount})`) + await lock.acquire() + let part + try { + part = await read.async.exactly(readable, partSize, 'truncate') + } finally { + lock.release() } - if (fileSize === -1 && (reader.ended() || !part)) { - fileSize = pos + (part?.length ?? 0) + const ended = part.length < partSize + + if (ended && fileSize !== -1 && thisIdx !== partCount - 1) { + throw new MtArgumentError(`Unexpected EOS (there were only ${idx} parts, but expected ${partCount})`) + } + + if (ended && fileSize === -1) { + fileSize = pos + part.length partCount = ~~((fileSize + partSize - 1) / partSize) - if (!part) part = new Uint8Array(0) client.log.debug('readable ended, file size = %d, part count = %d', fileSize, partCount) } - if (!ArrayBuffer.isView(part)) { - throw new MtArgumentError(`Part ${thisIdx} was not a Uint8Array!`) - } - if (part.length > partSize) { - throw new MtArgumentError(`Part ${thisIdx} had invalid size (expected ${partSize}, got ${part.length})`) - } - if (thisIdx === 0 && fileMime === undefined) { const mime = guessFileMime(part) diff --git a/packages/core/src/highlevel/types/files/utils.ts b/packages/core/src/highlevel/types/files/utils.ts index 705baac4..c7a76f27 100644 --- a/packages/core/src/highlevel/types/files/utils.ts +++ b/packages/core/src/highlevel/types/files/utils.ts @@ -1,6 +1,7 @@ /* eslint-disable no-restricted-globals */ import type { tdFileId } from '@mtcute/file-id' import type { tl } from '@mtcute/tl' +import type { IReadable } from '@fuman/io' import type { AnyToNever } from '../../../types/utils.js' @@ -27,6 +28,7 @@ export type UploadFileLike = | File | Blob | string + | IReadable | AnyToNever | AnyToNever> | AnyToNever diff --git a/packages/core/src/highlevel/utils/stream-utils.test.ts b/packages/core/src/highlevel/utils/stream-utils.test.ts deleted file mode 100644 index 1384ec94..00000000 --- a/packages/core/src/highlevel/utils/stream-utils.test.ts +++ /dev/null @@ -1,83 +0,0 @@ -import { describe, expect, it } from 'vitest' - -import { createChunkedReader } from './stream-utils.js' - -describe('createChunkedReader', () => { - it('should correctly handle chunks smaller than chunkSize', async () => { - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(new Uint8Array([1, 2, 3])) - controller.enqueue(new Uint8Array([4, 5, 6])) - controller.close() - }, - }) - const reader = createChunkedReader(stream, 4) - - expect(await reader.read()).to.deep.equal(new Uint8Array([1, 2, 3, 4])) - expect(reader.ended()).toEqual(false) - expect(await reader.read()).to.deep.equal(new Uint8Array([5, 6])) - expect(reader.ended()).toEqual(true) - expect(await reader.read()).toEqual(null) - }) - - it('should correctly handle chunks larger than chunkSize', async () => { - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(new Uint8Array([1, 2, 3])) - controller.enqueue(new Uint8Array([4, 5, 6, 7])) - controller.close() - }, - }) - const reader = createChunkedReader(stream, 2) - - expect(await reader.read()).to.deep.equal(new Uint8Array([1, 2])) - expect(reader.ended()).toEqual(false) - expect(await reader.read()).to.deep.equal(new Uint8Array([3, 4])) - expect(reader.ended()).toEqual(false) - expect(await reader.read()).to.deep.equal(new Uint8Array([5, 6])) - expect(reader.ended()).toEqual(false) - expect(await reader.read()).to.deep.equal(new Uint8Array([7])) - expect(reader.ended()).toEqual(true) - expect(await reader.read()).toEqual(null) - }) - - it('should correctly handle chunks equal to chunkSize', async () => { - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(new Uint8Array([1, 2, 3])) - controller.enqueue(new Uint8Array([4, 5, 6])) - controller.close() - }, - }) - const reader = createChunkedReader(stream, 3) - - expect(await reader.read()).to.deep.equal(new Uint8Array([1, 2, 3])) - expect(reader.ended()).toEqual(false) - expect(await reader.read()).to.deep.equal(new Uint8Array([4, 5, 6])) - expect(reader.ended()).toEqual(true) - expect(await reader.read()).toEqual(null) - }) - - it('should correctly handle mixed chunks', async () => { - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(new Uint8Array([1, 2, 3])) - controller.enqueue(new Uint8Array([4, 5, 6, 7])) - controller.enqueue(new Uint8Array([8, 9])) - controller.enqueue(new Uint8Array([10, 11, 12, 13, 14])) - controller.close() - }, - }) - const reader = createChunkedReader(stream, 4) - - expect(await reader.read()).to.deep.equal(new Uint8Array([1, 2, 3, 4])) - expect(reader.ended()).toEqual(false) - expect(await reader.read()).to.deep.equal(new Uint8Array([5, 6, 7, 8])) - expect(reader.ended()).toEqual(false) - expect(await reader.read()).to.deep.equal(new Uint8Array([9, 10, 11, 12])) - expect(reader.ended()).toEqual(false) - expect(await reader.read()).to.deep.equal(new Uint8Array([13, 14])) - expect(reader.ended()).toEqual(true) - expect(await reader.read()).toEqual(null) - }) -}) diff --git a/packages/core/src/highlevel/utils/stream-utils.ts b/packages/core/src/highlevel/utils/stream-utils.ts index 5110449f..e8c11cbb 100644 --- a/packages/core/src/highlevel/utils/stream-utils.ts +++ b/packages/core/src/highlevel/utils/stream-utils.ts @@ -1,4 +1,4 @@ -import { AsyncLock, u8 } from '@fuman/utils' +import { u8 } from '@fuman/utils' export function bufferToStream(buf: Uint8Array): ReadableStream { return new ReadableStream({ @@ -22,118 +22,3 @@ export async function streamToBuffer(stream: ReadableStream): Promis return u8.concat(chunks) } - -export function createChunkedReader(stream: ReadableStream, chunkSize: number): { - ended: () => boolean - read: () => Promise -} { - const reader = stream.getReader() - const lock = new AsyncLock() - - const buffer: Uint8Array[] = [] - let bufferLength = 0 - - let next: Uint8Array | undefined - let first = true - - async function readInner(): Promise { - const { value } = await reader.read() - - if (first) { - first = false - const { value: nextValue } = await reader.read() - - next = nextValue - - return value - } - - const tmp = next - next = value - - return tmp - } - - async function read(): Promise { - if (bufferLength > chunkSize) { - const chunks = [] - let length = 0 - - while (length < chunkSize && buffer.length) { - const chunk = buffer.shift()! - length += chunk.length - chunks.push(chunk) - } - - if (length > chunkSize) { - const lastChunk = chunks.pop()! - const diff = length - chunkSize - chunks.push(lastChunk.subarray(0, lastChunk.length - diff)) - buffer.unshift(lastChunk.subarray(lastChunk.length - diff)) - length = chunkSize - } - - if (length === chunkSize) { - bufferLength -= chunkSize - - return u8.concat(chunks) - } - } else if (next === undefined && bufferLength > 0) { - bufferLength = 0 - - return u8.concat(buffer) - } - - const value = await readInner() - if (!value) return null - - if (bufferLength > 0) { - buffer.push(value) - bufferLength += value.length - - // to avoid code duplication - return read() - } - - if (value.length > chunkSize) { - const rest = value.subarray(chunkSize) - buffer.push(rest) - bufferLength += rest.length - - return value.subarray(0, chunkSize) - } - - if (value.length === chunkSize) { - return value - } - - buffer.push(value) - bufferLength += value.length - - return read() - } - - async function readLocked() { - await lock.acquire() - - let res: Uint8Array | null = null - let err: Error | null = null - - try { - res = await read() - } catch (e) { - err = e as Error - } - - lock.release() - - if (err) throw err - - return res - } - - return { - ended: () => bufferLength === 0 && next === undefined, - read: readLocked, - } -}