From a49b561916f4864b8e9229a98532d26dfca90631 Mon Sep 17 00:00:00 2001 From: alina sireneva Date: Tue, 21 Jan 2025 18:07:01 +0300 Subject: [PATCH] feat: downloading files from repl --- packages/worker/src/sw/download/client.ts | 108 ++++++++++++++++++++ packages/worker/src/sw/download/handler.ts | 38 +++++++ packages/worker/src/sw/download/protocol.ts | 14 +++ packages/worker/src/sw/download/source.ts | 44 ++++++++ packages/worker/src/sw/iframe/script.ts | 34 +++++- packages/worker/src/sw/main.ts | 11 ++ 6 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 packages/worker/src/sw/download/client.ts create mode 100644 packages/worker/src/sw/download/handler.ts create mode 100644 packages/worker/src/sw/download/protocol.ts create mode 100644 packages/worker/src/sw/download/source.ts diff --git a/packages/worker/src/sw/download/client.ts b/packages/worker/src/sw/download/client.ts new file mode 100644 index 0000000..183810c --- /dev/null +++ b/packages/worker/src/sw/download/client.ts @@ -0,0 +1,108 @@ +import type { SwMessage } from '../main.ts' +import type { DownloadFileParams } from './handler.ts' +import type { SwDownloadMessage } from './protocol.ts' +import { asNonNull, Deferred } from '@fuman/utils' +import { nanoid } from 'nanoid' +import { SW_DOWNLOAD_OPCODE } from './protocol.ts' + +const { WRITE, CLOSE, PULL, ERROR, ABORT } = SW_DOWNLOAD_OPCODE + +class MessagePortSink implements UnderlyingSink { + controller!: WritableStreamDefaultController + + ready = new Deferred() + readyPending = false + + constructor( + readonly port: MessagePort, + readonly onAbort: (reason: unknown) => void, + ) { + port.onmessage = this._onMessage.bind(this) + this._resetReady() + } + + start(controller: WritableStreamDefaultController) { + this.controller = controller + // Apply initial backpressure + return this.ready.promise + } + + write(chunk: Uint8Array) { + const message = { type: WRITE, chunk } + + this.port.postMessage(message, [chunk.buffer]) + + // Assume backpressure after every write, until sender pulls + this._resetReady() + + // Apply backpressure + return this.ready.promise + } + + close() { + this.port.postMessage({ type: CLOSE }) + this.port.close() + } + + abort(reason: any) { + this.port.postMessage({ type: ABORT, reason }) + this.port.close() + this.onAbort(reason) + } + + _onMessage(event: MessageEvent) { + const message = event.data as SwDownloadMessage + if (message.type === PULL) this._resolveReady() + if (message.type === ERROR) this._onError(message.reason) + } + + _onError(reason: any) { + this.controller.error(reason) + this.port.close() + this._rejectReady(reason) + } + + _resetReady() { + this.ready = new Deferred() + this.readyPending = true + } + + _resolveReady() { + this.ready.resolve() + this.readyPending = false + } + + _rejectReady(reason: any) { + if (!this.readyPending) this._resetReady() + this.ready.promise.catch(() => {}) + this.ready.reject(reason) + this.readyPending = false + } +} + +export function createFileDownload( + options: DownloadFileParams, + onAbort: (reason: unknown) => void, +): WritableStream { + const sw = asNonNull(navigator.serviceWorker.controller) + const id = nanoid() + const channel = new MessageChannel() + + sw.postMessage({ + event: 'DOWNLOAD_FILE', + id, + port: channel.port1, + params: options, + } satisfies SwMessage, [channel.port1]) + + const iframe = document.createElement('iframe') + iframe.src = `/sw/download/${id}` + iframe.hidden = true + document.body.appendChild(iframe) + + iframe.addEventListener('load', () => { + iframe.remove() + }) + + return new WritableStream(new MessagePortSink(channel.port2, onAbort)) +} diff --git a/packages/worker/src/sw/download/handler.ts b/packages/worker/src/sw/download/handler.ts new file mode 100644 index 0000000..5014f1c --- /dev/null +++ b/packages/worker/src/sw/download/handler.ts @@ -0,0 +1,38 @@ +// roughly based on https://github.com/jimmywarting/native-file-system-adapter + +import { MessagePortSource } from './source.ts' + +export interface DownloadFileParams { + filename: string + size?: number +} + +const stateMap = new Map +}>() + +export function handleDownload(id: string): Response { + if (!stateMap.has(id)) return new Response(null, { status: 404 }) + const state = stateMap.get(id)! + stateMap.delete(id) + + // Make filename RFC5987 compatible + const fileName = encodeURIComponent(state.filename).replace(/['()]/g, escape).replace(/\*/g, '%2A') + return new Response(state.stream, { + headers: { + 'Content-Type': 'application/octet-stream; charset=utf-8', + 'Content-Disposition': `attachment; filename*=UTF-8''${fileName}`, + ...(state.size ? { 'Content-Length': state.size.toString() } : {}), + }, + }) +} + +export function handlePort(port: MessagePort, id: string, params: DownloadFileParams) { + stateMap.set(id, { + ...params, + stream: new ReadableStream( + new MessagePortSource(port), + new CountQueuingStrategy({ highWaterMark: 4 }), + ), + }) +} diff --git a/packages/worker/src/sw/download/protocol.ts b/packages/worker/src/sw/download/protocol.ts new file mode 100644 index 0000000..82dc192 --- /dev/null +++ b/packages/worker/src/sw/download/protocol.ts @@ -0,0 +1,14 @@ +export const SW_DOWNLOAD_OPCODE = { + WRITE: 0, + PULL: 0, + ERROR: 1, + ABORT: 1, + CLOSE: 2, + PING: 3, +} + +export interface SwDownloadMessage { + type: number + chunk?: Uint8Array + reason?: any +} diff --git a/packages/worker/src/sw/download/source.ts b/packages/worker/src/sw/download/source.ts new file mode 100644 index 0000000..58298a0 --- /dev/null +++ b/packages/worker/src/sw/download/source.ts @@ -0,0 +1,44 @@ +import type { SwDownloadMessage } from './protocol.ts' +import { SW_DOWNLOAD_OPCODE } from './protocol.ts' + +const { WRITE, PULL, ERROR, ABORT, CLOSE } = SW_DOWNLOAD_OPCODE + +export class MessagePortSource implements UnderlyingSource { + controller!: ReadableStreamController + + constructor(readonly port: MessagePort) { + this.port = port + this.port.onmessage = this.onMessage.bind(this) + } + + start(controller: ReadableStreamController) { + this.controller = controller + } + + pull() { + this.port.postMessage({ type: PULL }) + } + + cancel(reason: Error) { + // Firefox can notify a cancel event, chrome can't + // https://bugs.chromium.org/p/chromium/issues/detail?id=638494 + this.port.postMessage({ type: ERROR, reason: reason.message }) + this.port.close() + } + + onMessage(event: MessageEvent) { + const message = event.data as SwDownloadMessage + // enqueue() will call pull() if needed when there's no backpressure + if (message.type === WRITE) { + (this.controller as any).enqueue(message.chunk!) + } + if (message.type === ABORT) { + this.controller.error(message.reason) + this.port.close() + } + if (message.type === CLOSE) { + this.controller.close() + this.port.close() + } + } +} diff --git a/packages/worker/src/sw/iframe/script.ts b/packages/worker/src/sw/iframe/script.ts index d9e8171..3d5d358 100644 --- a/packages/worker/src/sw/iframe/script.ts +++ b/packages/worker/src/sw/iframe/script.ts @@ -1,7 +1,8 @@ import { asNonNull } from '@fuman/utils' -import { Long, TelegramClient } from '@mtcute/web' +import { FileLocation, Long, TelegramClient } from '@mtcute/web' import { nanoid } from 'nanoid' import { swInvokeMethodInner } from '../client-inner.ts' +import { createFileDownload } from '../download/client.ts' type ConnectionState = import('@mtcute/web').ConnectionState type TelegramClientOptions = import('@mtcute/web').TelegramClientOptions @@ -72,6 +73,37 @@ function initClient(accountId: string, verbose: boolean) { // eslint-disable-next-line no-console console.log('%c[UPDATE]%c %s: %o', 'color: #8d7041', 'color: unset', update.name, update.data) }) + + window.tg.downloadToFile = async (filename, input, params) => { + // todo: there should probably be a better way than this + let fileSize = params?.fileSize + if (!fileSize) { + if (input instanceof FileLocation) { + let locationInner = input.location + + if (typeof locationInner === 'function') { + locationInner = locationInner() + } + + if (ArrayBuffer.isView(locationInner)) { + fileSize = locationInner.byteLength + } else { + fileSize = input.fileSize + } + } + } + + const abortController = new AbortController() + const writable = createFileDownload( + { + filename, + size: fileSize, + }, + reason => abortController.abort(reason), + ) + + await window.tg.downloadAsStream(input, params).pipeTo(writable) + } } window.addEventListener('message', async ({ data }) => { diff --git a/packages/worker/src/sw/main.ts b/packages/worker/src/sw/main.ts index 8b967ae..64f8ca7 100644 --- a/packages/worker/src/sw/main.ts +++ b/packages/worker/src/sw/main.ts @@ -2,6 +2,7 @@ import { unknownToError } from '@fuman/utils' import { IS_SAFARI } from '../utils/env.ts' import { clearAvatarCache, handleAvatarRequest } from './avatar.ts' import { requestCache } from './cache.ts' +import { type DownloadFileParams, handleDownload, handlePort } from './download/handler.ts' import { clearCache, handleRuntimeRequest } from './runtime.ts' import { forgetAllScripts, forgetScript, uploadScript } from './scripts.ts' @@ -17,6 +18,11 @@ async function handleSwRequest(_req: Request, url: URL): Promise { return handleRuntimeRequest(url) } + if (url.pathname.startsWith('/sw/download/')) { + const id = url.pathname.split('/')[3] + return handleDownload(id) + } + return new Response('Not Found', { status: 404 }) } @@ -58,6 +64,7 @@ export type SwMessage = | { event: 'FORGET_SCRIPT', name: string } | { event: 'CLEAR_AVATAR_CACHE', accountId: string } | { event: 'CLEAR_CACHE' } + | { event: 'DOWNLOAD_FILE', id: string, params: DownloadFileParams, port: MessagePort } async function handleMessage(msg: SwMessage) { switch (msg.event) { @@ -83,6 +90,10 @@ async function handleMessage(msg: SwMessage) { self.onmessage = async (event) => { const msg = event.data as SwMessage & { id: number } + if (msg.event === 'DOWNLOAD_FILE') { + return handlePort(msg.port, msg.id, msg.params) + } + try { const result = await handleMessage(msg) event.source!.postMessage({ id: msg.id, result })