chore: use fuman streams when uploading

This commit is contained in:
alina 🌸 2024-09-07 00:04:17 +03:00
parent eebf95c6ec
commit 4668b356db
Signed by: teidesu
SSH key fingerprint: SHA256:uNeCpw6aTSU4aIObXLvHfLkDa82HWH9EiOj9AXOIRpI
4 changed files with 28 additions and 218 deletions

View file

@ -1,4 +1,7 @@
import type { tl } from '@mtcute/tl'
import type { IReadable } from '@fuman/io'
import { read } from '@fuman/io'
import { AsyncLock } from '@fuman/utils'
import { getPlatform } from '../../../platform.js'
import { MtArgumentError } from '../../../types/errors.js'
@ -7,7 +10,7 @@ import type { ITelegramClient } from '../../client.types.js'
import type { UploadFileLike, UploadedFile } from '../../types/index.js'
import { MIME_TO_EXTENSION, guessFileMime } from '../../utils/file-type.js'
import { determinePartSize, isProbablyPlainText } from '../../utils/file-utils.js'
import { bufferToStream, createChunkedReader, streamToBuffer } from '../../utils/stream-utils.js'
import { bufferToStream } from '../../utils/stream-utils.js'
const OVERRIDE_MIME: Record<string, string> = {
// tg doesn't interpret `audio/opus` files as voice messages for some reason
@ -174,16 +177,20 @@ export async function uploadFile(
file = file.body
}
if (!(file instanceof ReadableStream)) {
if (file instanceof ReadableStream) {
file = read.async.fromWeb(file)
} else if (!(typeof file === 'object' && 'read' in file)) { // IReadable
throw new MtArgumentError('Could not convert input `file` to stream!')
}
const readable = file as IReadable
// set file size if not automatically inferred
if (fileSize === -1 && params.fileSize) fileSize = params.fileSize
if (fileSize === -1 && params.requireFileSize) {
// buffer the entire stream in memory, then convert it back to stream (bruh)
const buffer = await streamToBuffer(file)
const buffer = await read.async.untilEnd(readable)
fileSize = buffer.length
file = bufferToStream(buffer)
}
@ -227,35 +234,34 @@ export async function uploadFile(
)
const fileId = randomLong()
const stream = file
const lock = new AsyncLock()
let pos = 0
let idx = 0
const reader = createChunkedReader(stream, partSize)
const uploadNextPart = async (): Promise<void> => {
const thisIdx = idx++
let part = await reader.read()
if (!part && fileSize !== -1) {
throw new MtArgumentError(`Unexpected EOS (there were only ${idx - 1} parts, but expected ${partCount})`)
await lock.acquire()
let part
try {
part = await read.async.exactly(readable, partSize, 'truncate')
} finally {
lock.release()
}
if (fileSize === -1 && (reader.ended() || !part)) {
fileSize = pos + (part?.length ?? 0)
const ended = part.length < partSize
if (ended && fileSize !== -1 && thisIdx !== partCount - 1) {
throw new MtArgumentError(`Unexpected EOS (there were only ${idx} parts, but expected ${partCount})`)
}
if (ended && fileSize === -1) {
fileSize = pos + part.length
partCount = ~~((fileSize + partSize - 1) / partSize)
if (!part) part = new Uint8Array(0)
client.log.debug('readable ended, file size = %d, part count = %d', fileSize, partCount)
}
if (!ArrayBuffer.isView(part)) {
throw new MtArgumentError(`Part ${thisIdx} was not a Uint8Array!`)
}
if (part.length > partSize) {
throw new MtArgumentError(`Part ${thisIdx} had invalid size (expected ${partSize}, got ${part.length})`)
}
if (thisIdx === 0 && fileMime === undefined) {
const mime = guessFileMime(part)

View file

@ -1,6 +1,7 @@
/* eslint-disable no-restricted-globals */
import type { tdFileId } from '@mtcute/file-id'
import type { tl } from '@mtcute/tl'
import type { IReadable } from '@fuman/io'
import type { AnyToNever } from '../../../types/utils.js'
@ -27,6 +28,7 @@ export type UploadFileLike =
| File
| Blob
| string
| IReadable
| AnyToNever<import('node:fs').ReadStream>
| AnyToNever<ReadableStream<Uint8Array>>
| AnyToNever<NodeJS.ReadableStream>

View file

@ -1,83 +0,0 @@
import { describe, expect, it } from 'vitest'
import { createChunkedReader } from './stream-utils.js'
describe('createChunkedReader', () => {
it('should correctly handle chunks smaller than chunkSize', async () => {
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]))
controller.enqueue(new Uint8Array([4, 5, 6]))
controller.close()
},
})
const reader = createChunkedReader(stream, 4)
expect(await reader.read()).to.deep.equal(new Uint8Array([1, 2, 3, 4]))
expect(reader.ended()).toEqual(false)
expect(await reader.read()).to.deep.equal(new Uint8Array([5, 6]))
expect(reader.ended()).toEqual(true)
expect(await reader.read()).toEqual(null)
})
it('should correctly handle chunks larger than chunkSize', async () => {
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]))
controller.enqueue(new Uint8Array([4, 5, 6, 7]))
controller.close()
},
})
const reader = createChunkedReader(stream, 2)
expect(await reader.read()).to.deep.equal(new Uint8Array([1, 2]))
expect(reader.ended()).toEqual(false)
expect(await reader.read()).to.deep.equal(new Uint8Array([3, 4]))
expect(reader.ended()).toEqual(false)
expect(await reader.read()).to.deep.equal(new Uint8Array([5, 6]))
expect(reader.ended()).toEqual(false)
expect(await reader.read()).to.deep.equal(new Uint8Array([7]))
expect(reader.ended()).toEqual(true)
expect(await reader.read()).toEqual(null)
})
it('should correctly handle chunks equal to chunkSize', async () => {
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]))
controller.enqueue(new Uint8Array([4, 5, 6]))
controller.close()
},
})
const reader = createChunkedReader(stream, 3)
expect(await reader.read()).to.deep.equal(new Uint8Array([1, 2, 3]))
expect(reader.ended()).toEqual(false)
expect(await reader.read()).to.deep.equal(new Uint8Array([4, 5, 6]))
expect(reader.ended()).toEqual(true)
expect(await reader.read()).toEqual(null)
})
it('should correctly handle mixed chunks', async () => {
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]))
controller.enqueue(new Uint8Array([4, 5, 6, 7]))
controller.enqueue(new Uint8Array([8, 9]))
controller.enqueue(new Uint8Array([10, 11, 12, 13, 14]))
controller.close()
},
})
const reader = createChunkedReader(stream, 4)
expect(await reader.read()).to.deep.equal(new Uint8Array([1, 2, 3, 4]))
expect(reader.ended()).toEqual(false)
expect(await reader.read()).to.deep.equal(new Uint8Array([5, 6, 7, 8]))
expect(reader.ended()).toEqual(false)
expect(await reader.read()).to.deep.equal(new Uint8Array([9, 10, 11, 12]))
expect(reader.ended()).toEqual(false)
expect(await reader.read()).to.deep.equal(new Uint8Array([13, 14]))
expect(reader.ended()).toEqual(true)
expect(await reader.read()).toEqual(null)
})
})

View file

@ -1,4 +1,4 @@
import { AsyncLock, u8 } from '@fuman/utils'
import { u8 } from '@fuman/utils'
export function bufferToStream(buf: Uint8Array): ReadableStream<Uint8Array> {
return new ReadableStream({
@ -22,118 +22,3 @@ export async function streamToBuffer(stream: ReadableStream<Uint8Array>): Promis
return u8.concat(chunks)
}
export function createChunkedReader(stream: ReadableStream<Uint8Array>, chunkSize: number): {
ended: () => boolean
read: () => Promise<Uint8Array | null>
} {
const reader = stream.getReader()
const lock = new AsyncLock()
const buffer: Uint8Array[] = []
let bufferLength = 0
let next: Uint8Array | undefined
let first = true
async function readInner(): Promise<Uint8Array | undefined> {
const { value } = await reader.read()
if (first) {
first = false
const { value: nextValue } = await reader.read()
next = nextValue
return value
}
const tmp = next
next = value
return tmp
}
async function read(): Promise<Uint8Array | null> {
if (bufferLength > chunkSize) {
const chunks = []
let length = 0
while (length < chunkSize && buffer.length) {
const chunk = buffer.shift()!
length += chunk.length
chunks.push(chunk)
}
if (length > chunkSize) {
const lastChunk = chunks.pop()!
const diff = length - chunkSize
chunks.push(lastChunk.subarray(0, lastChunk.length - diff))
buffer.unshift(lastChunk.subarray(lastChunk.length - diff))
length = chunkSize
}
if (length === chunkSize) {
bufferLength -= chunkSize
return u8.concat(chunks)
}
} else if (next === undefined && bufferLength > 0) {
bufferLength = 0
return u8.concat(buffer)
}
const value = await readInner()
if (!value) return null
if (bufferLength > 0) {
buffer.push(value)
bufferLength += value.length
// to avoid code duplication
return read()
}
if (value.length > chunkSize) {
const rest = value.subarray(chunkSize)
buffer.push(rest)
bufferLength += rest.length
return value.subarray(0, chunkSize)
}
if (value.length === chunkSize) {
return value
}
buffer.push(value)
bufferLength += value.length
return read()
}
async function readLocked() {
await lock.acquire()
let res: Uint8Array | null = null
let err: Error | null = null
try {
res = await read()
} catch (e) {
err = e as Error
}
lock.release()
if (err) throw err
return res
}
return {
ended: () => bufferLength === 0 && next === undefined,
read: readLocked,
}
}