fix(core): properly handle uploads
This commit is contained in:
parent
67758ca246
commit
a711ebaa1d
3 changed files with 27 additions and 23 deletions
|
@ -1,12 +1,11 @@
|
||||||
import type { ITelegramClient } from '../../client.types.js'
|
import type { ITelegramClient } from '../../client.types.js'
|
||||||
import type { FileDownloadLocation, FileDownloadParameters } from '../../types/index.js'
|
import type { FileDownloadLocation, FileDownloadParameters } from '../../types/index.js'
|
||||||
import { FileLocation } from '../../types/index.js'
|
import { FileLocation } from '../../types/index.js'
|
||||||
import { bufferToStream } from '../../utils/stream-utils.js'
|
|
||||||
|
|
||||||
import { downloadAsIterable } from './download-iterable.js'
|
import { downloadAsIterable } from './download-iterable.js'
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Download a file and return it as a readable stream,
|
* Download a file and return it as a `@fuman/io` stream,
|
||||||
* streaming file contents.
|
* streaming file contents.
|
||||||
*
|
*
|
||||||
* @param params File download parameters
|
* @param params File download parameters
|
||||||
|
@ -17,7 +16,13 @@ export function downloadAsStream(
|
||||||
params?: FileDownloadParameters,
|
params?: FileDownloadParameters,
|
||||||
): ReadableStream<Uint8Array> {
|
): ReadableStream<Uint8Array> {
|
||||||
if (location instanceof FileLocation && ArrayBuffer.isView(location.location)) {
|
if (location instanceof FileLocation && ArrayBuffer.isView(location.location)) {
|
||||||
return bufferToStream(location.location)
|
const buf = location.location
|
||||||
|
return new ReadableStream({
|
||||||
|
start(controller) {
|
||||||
|
controller.enqueue(buf)
|
||||||
|
controller.close()
|
||||||
|
},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
const cancel = new AbortController()
|
const cancel = new AbortController()
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import type { tl } from '@mtcute/tl'
|
import type { tl } from '@mtcute/tl'
|
||||||
import type { IReadable } from '@fuman/io'
|
import type { IReadable } from '@fuman/io'
|
||||||
import { read, webReadableToFuman } from '@fuman/io'
|
import { Bytes, read, webReadableToFuman } from '@fuman/io'
|
||||||
import { AsyncLock } from '@fuman/utils'
|
import { AsyncLock } from '@fuman/utils'
|
||||||
|
|
||||||
import { MtArgumentError } from '../../../types/errors.js'
|
import { MtArgumentError } from '../../../types/errors.js'
|
||||||
|
@ -9,7 +9,6 @@ import type { ITelegramClient } from '../../client.types.js'
|
||||||
import type { UploadFileLike, UploadedFile } from '../../types/index.js'
|
import type { UploadFileLike, UploadedFile } from '../../types/index.js'
|
||||||
import { MIME_TO_EXTENSION, guessFileMime } from '../../utils/file-type.js'
|
import { MIME_TO_EXTENSION, guessFileMime } from '../../utils/file-type.js'
|
||||||
import { determinePartSize, isProbablyPlainText } from '../../utils/file-utils.js'
|
import { determinePartSize, isProbablyPlainText } from '../../utils/file-utils.js'
|
||||||
import { bufferToStream } from '../../utils/stream-utils.js'
|
|
||||||
|
|
||||||
const OVERRIDE_MIME: Record<string, string> = {
|
const OVERRIDE_MIME: Record<string, string> = {
|
||||||
// tg doesn't interpret `audio/opus` files as voice messages for some reason
|
// tg doesn't interpret `audio/opus` files as voice messages for some reason
|
||||||
|
@ -131,7 +130,7 @@ export async function uploadFile(
|
||||||
|
|
||||||
if (ArrayBuffer.isView(file)) {
|
if (ArrayBuffer.isView(file)) {
|
||||||
fileSize = file.length
|
fileSize = file.length
|
||||||
file = bufferToStream(file)
|
file = Bytes.from(file)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (HAS_FILE && file instanceof File) {
|
if (HAS_FILE && file instanceof File) {
|
||||||
|
@ -190,7 +189,7 @@ export async function uploadFile(
|
||||||
throw new MtArgumentError('Could not convert input `file` to stream!')
|
throw new MtArgumentError('Could not convert input `file` to stream!')
|
||||||
}
|
}
|
||||||
|
|
||||||
const readable = file as IReadable
|
let readable = file as IReadable
|
||||||
|
|
||||||
// set file size if not automatically inferred
|
// set file size if not automatically inferred
|
||||||
if (fileSize === -1 && params.fileSize) fileSize = params.fileSize
|
if (fileSize === -1 && params.fileSize) fileSize = params.fileSize
|
||||||
|
@ -199,7 +198,7 @@ export async function uploadFile(
|
||||||
// buffer the entire stream in memory, then convert it back to stream (bruh)
|
// buffer the entire stream in memory, then convert it back to stream (bruh)
|
||||||
const buffer = await read.async.untilEnd(readable)
|
const buffer = await read.async.untilEnd(readable)
|
||||||
fileSize = buffer.length
|
fileSize = buffer.length
|
||||||
file = bufferToStream(buffer)
|
readable = Bytes.from(buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
let partSizeKb = params.partSize
|
let partSizeKb = params.partSize
|
||||||
|
@ -245,11 +244,18 @@ export async function uploadFile(
|
||||||
|
|
||||||
let pos = 0
|
let pos = 0
|
||||||
let idx = 0
|
let idx = 0
|
||||||
|
let readableEnded = false
|
||||||
|
|
||||||
const uploadNextPart = async (): Promise<void> => {
|
const uploadNextPart = async (): Promise<void> => {
|
||||||
const thisIdx = idx++
|
const thisIdx = idx++
|
||||||
|
|
||||||
await lock.acquire()
|
await lock.acquire()
|
||||||
|
|
||||||
|
if (readableEnded) {
|
||||||
|
lock.release()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
let part
|
let part
|
||||||
try {
|
try {
|
||||||
part = await read.async.exactly(readable, partSize, 'truncate')
|
part = await read.async.exactly(readable, partSize, 'truncate')
|
||||||
|
@ -258,14 +264,16 @@ export async function uploadFile(
|
||||||
}
|
}
|
||||||
|
|
||||||
const ended = part.length < partSize
|
const ended = part.length < partSize
|
||||||
|
if (ended) {
|
||||||
|
readableEnded = true
|
||||||
|
|
||||||
if (ended && fileSize !== -1 && thisIdx !== partCount - 1) {
|
if (fileSize === -1) {
|
||||||
throw new MtArgumentError(`Unexpected EOS (there were only ${idx} parts, but expected ${partCount})`)
|
fileSize = pos + part.length
|
||||||
}
|
partCount = ~~((fileSize + partSize - 1) / partSize)
|
||||||
|
} else if (thisIdx !== partCount - 1) {
|
||||||
|
throw new MtArgumentError(`Unexpected EOS (there were only ${idx} parts, but expected ${partCount})`)
|
||||||
|
}
|
||||||
|
|
||||||
if (ended && fileSize === -1) {
|
|
||||||
fileSize = pos + part.length
|
|
||||||
partCount = ~~((fileSize + partSize - 1) / partSize)
|
|
||||||
client.log.debug('readable ended, file size = %d, part count = %d', fileSize, partCount)
|
client.log.debug('readable ended, file size = %d, part count = %d', fileSize, partCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,14 +1,5 @@
|
||||||
import { u8 } from '@fuman/utils'
|
import { u8 } from '@fuman/utils'
|
||||||
|
|
||||||
export function bufferToStream(buf: Uint8Array): ReadableStream<Uint8Array> {
|
|
||||||
return new ReadableStream({
|
|
||||||
start(controller) {
|
|
||||||
controller.enqueue(buf)
|
|
||||||
controller.close()
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function streamToBuffer(stream: ReadableStream<Uint8Array>): Promise<Uint8Array> {
|
export async function streamToBuffer(stream: ReadableStream<Uint8Array>): Promise<Uint8Array> {
|
||||||
const chunks: Uint8Array[] = []
|
const chunks: Uint8Array[] = []
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue