feat(core): websocket transport
untested in browser (yet). also had to slightly refactor stuff
This commit is contained in:
parent
9e681cb13f
commit
69270a66a2
7 changed files with 269 additions and 9 deletions
|
@ -6,10 +6,12 @@ import {
|
|||
ControllablePromise,
|
||||
createControllablePromise,
|
||||
} from '../utils/controllable-promise'
|
||||
import { ICryptoProvider } from '../utils/crypto'
|
||||
|
||||
const debug = require('debug')('mtcute:conn')
|
||||
|
||||
export interface PersistentConnectionParams {
|
||||
crypto: ICryptoProvider
|
||||
transportFactory: TransportFactory
|
||||
dc: tl.RawDcOption
|
||||
reconnectionStrategy: ReconnectionStrategy<PersistentConnectionParams>
|
||||
|
@ -52,6 +54,7 @@ export abstract class PersistentConnection extends EventEmitter {
|
|||
super()
|
||||
this.params = params
|
||||
this._transport = params.transportFactory()
|
||||
this._transport.setupCrypto?.(params.crypto)
|
||||
|
||||
this._transport.on('ready', this.onTransportReady.bind(this))
|
||||
this._transport.on('message', this.onTransportMessage.bind(this))
|
||||
|
|
|
@ -4,7 +4,6 @@ import {
|
|||
} from './persistent-connection'
|
||||
import { TransportError } from './transports'
|
||||
import { tl } from '@mtcute/tl'
|
||||
import { ICryptoProvider } from '../utils/crypto'
|
||||
import { doAuthorization } from './authorization'
|
||||
import { MtprotoSession } from './mtproto-session'
|
||||
import { BinaryWriter } from '../utils/binary/binary-writer'
|
||||
|
@ -29,7 +28,6 @@ _debug.formatters.h = (v: Buffer): string => v.toString('hex')
|
|||
_debug.formatters.b = (v: any): string => !!v + ''
|
||||
|
||||
export interface TelegramConnectionParams extends PersistentConnectionParams {
|
||||
crypto: ICryptoProvider
|
||||
initConnection: tl.RawInitConnectionRequest
|
||||
inactivityTimeout?: number
|
||||
niceStacks?: boolean
|
||||
|
@ -63,7 +61,6 @@ interface PendingMessage {
|
|||
// TODO: error handling basically everywhere, most importantly (de-)serialization errors
|
||||
// noinspection JSUnusedLocalSymbols
|
||||
export class TelegramConnection extends PersistentConnection {
|
||||
private readonly _crypto: ICryptoProvider
|
||||
readonly params: TelegramConnectionParams
|
||||
|
||||
private readonly _mtproto: MtprotoSession
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import { tl } from '@mtcute/tl'
|
||||
import { MaybeAsync } from '../../types/utils'
|
||||
import { ICryptoProvider } from '../../utils/crypto'
|
||||
|
||||
export enum TransportState {
|
||||
/**
|
||||
|
@ -55,6 +56,12 @@ export interface ICuteTransport {
|
|||
close(): void
|
||||
/** send a message */
|
||||
send(data: Buffer): Promise<void>
|
||||
|
||||
/**
|
||||
* For transports whose codecs use crypto functions.
|
||||
* This method is called before any other.
|
||||
*/
|
||||
setupCrypto?(crypto: ICryptoProvider): void
|
||||
}
|
||||
|
||||
/** Transport factory function */
|
||||
|
@ -84,6 +91,12 @@ export interface PacketCodec {
|
|||
on(event: 'error', handler: (error: Error) => void): void
|
||||
/** Emitted when a full packet has been processed. */
|
||||
on(event: 'packet', handler: (packet: Buffer) => void): void
|
||||
|
||||
/**
|
||||
* For codecs that use crypto functions.
|
||||
* This method is called before any other.
|
||||
*/
|
||||
setupCrypto?(crypto: ICryptoProvider): void
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -2,8 +2,6 @@ import { TransportFactory } from './abstract'
|
|||
|
||||
export * from './abstract'
|
||||
export * from './streamed'
|
||||
export * from './tcp'
|
||||
export * from './tcp-intermediate'
|
||||
|
||||
/** Platform-defined default transport factory */
|
||||
export let defaultTransportFactory: TransportFactory
|
||||
|
@ -22,9 +20,7 @@ if (typeof process !== 'undefined') {
|
|||
)
|
||||
}
|
||||
} else {
|
||||
// TODO: implement websocket transport
|
||||
throw new Error('WebSocket is not supported (yet)!')
|
||||
// const { WebSocketTransport } = require('./websocket')
|
||||
// defaultTransportFactory = () => new WebSocketTransport()
|
||||
const { WebSocketObfuscatedTransport } = require('./ws-obfuscated')
|
||||
defaultTransportFactory = () => new WebSocketObfuscatedTransport()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ import { ICuteTransport, PacketCodec, TransportState } from './abstract'
|
|||
import { tl } from '@mtcute/tl'
|
||||
import { Socket, connect } from 'net'
|
||||
import EventEmitter from 'events'
|
||||
import { ICryptoProvider } from '../../utils/crypto'
|
||||
|
||||
const debug = require('debug')('mtcute:tcp')
|
||||
|
||||
|
@ -17,8 +18,14 @@ export abstract class TcpTransport
|
|||
private _socket: Socket | null = null
|
||||
|
||||
abstract _packetCodec: PacketCodec
|
||||
private _crypto: ICryptoProvider
|
||||
|
||||
packetCodecInitialized = false
|
||||
|
||||
setupCrypto(crypto: ICryptoProvider): void {
|
||||
this._crypto = crypto
|
||||
}
|
||||
|
||||
state(): TransportState {
|
||||
return this._state
|
||||
}
|
||||
|
|
151
packages/core/src/network/transports/websocket.ts
Normal file
151
packages/core/src/network/transports/websocket.ts
Normal file
|
@ -0,0 +1,151 @@
|
|||
import { ICuteTransport, PacketCodec, TransportState } from './abstract'
|
||||
import { tl } from '@mtcute/tl'
|
||||
import EventEmitter from 'events'
|
||||
import { typedArrayToBuffer } from '../../utils/buffer-utils'
|
||||
import { ICryptoProvider } from '../../utils/crypto'
|
||||
import type WebSocket from 'ws'
|
||||
|
||||
const debug = require('debug')('mtcute:ws')
|
||||
|
||||
let ws: {
|
||||
new (
|
||||
address: string,
|
||||
options?: string
|
||||
): WebSocket
|
||||
} | null
|
||||
if (typeof window === 'undefined' || typeof window.WebSocket === 'undefined') {
|
||||
try {
|
||||
ws = require('ws')
|
||||
} catch (e) {
|
||||
ws = null
|
||||
}
|
||||
} else {
|
||||
ws = window.WebSocket as any
|
||||
}
|
||||
|
||||
const subdomainsMap: Record<string, string> = {
|
||||
1: 'pluto',
|
||||
2: 'venus',
|
||||
3: 'aurora',
|
||||
4: 'vesta',
|
||||
5: 'flora',
|
||||
}
|
||||
|
||||
/**
|
||||
* Base for WebSocket transports.
|
||||
* Subclasses must provide packet codec in `_packetCodec` property
|
||||
*/
|
||||
export abstract class WebSocketTransport
|
||||
extends EventEmitter
|
||||
implements ICuteTransport {
|
||||
private _currentDc: tl.RawDcOption | null = null
|
||||
private _state: TransportState = TransportState.Idle
|
||||
private _socket: WebSocket | null = null
|
||||
private _crypto?: ICryptoProvider
|
||||
|
||||
abstract _packetCodec: PacketCodec
|
||||
packetCodecInitialized = false
|
||||
|
||||
private _baseDomain: string
|
||||
private _isTest: boolean
|
||||
private _subdomains: Record<string, string>
|
||||
|
||||
/**
|
||||
* @param isTest Whether this transport will be used for test DCs
|
||||
* @param baseDomain Base WebSocket domain
|
||||
* @param subdomains Map of sub-domains (key is DC ID, value is string)
|
||||
*/
|
||||
constructor(
|
||||
isTest = false,
|
||||
baseDomain = 'web.telegram.org',
|
||||
subdomains = subdomainsMap
|
||||
) {
|
||||
super()
|
||||
|
||||
if (!ws)
|
||||
throw new Error(
|
||||
'To use WebSocket transport with NodeJS, install `ws` package.'
|
||||
)
|
||||
|
||||
this._isTest = isTest
|
||||
this._baseDomain = baseDomain
|
||||
this._subdomains = subdomains
|
||||
|
||||
this.close = this.close.bind(this)
|
||||
}
|
||||
|
||||
setupCrypto(crypto: ICryptoProvider): void {
|
||||
this._crypto = crypto
|
||||
}
|
||||
|
||||
state(): TransportState {
|
||||
return this._state
|
||||
}
|
||||
|
||||
currentDc(): tl.RawDcOption | null {
|
||||
return this._currentDc
|
||||
}
|
||||
|
||||
connect(dc: tl.RawDcOption): void {
|
||||
if (this._state !== TransportState.Idle)
|
||||
throw new Error('Transport is not IDLE')
|
||||
|
||||
if (!this.packetCodecInitialized) {
|
||||
if (this._crypto) this._packetCodec.setupCrypto?.(this._crypto)
|
||||
this._packetCodec.on('error', (err) => this.emit('error', err))
|
||||
this._packetCodec.on('packet', (buf) => this.emit('message', buf))
|
||||
this.packetCodecInitialized = true
|
||||
}
|
||||
|
||||
this._state = TransportState.Connecting
|
||||
this._currentDc = dc
|
||||
this._socket = new ws!(
|
||||
`wss://${this._subdomains[dc.id]}.${this._baseDomain}/apiws${
|
||||
this._isTest ? '_test' : ''
|
||||
}`, 'binary'
|
||||
)
|
||||
|
||||
this._socket.addEventListener('message', (evt) =>
|
||||
this._packetCodec.feed(typedArrayToBuffer(evt.data))
|
||||
)
|
||||
this._socket.addEventListener('open', this.handleConnect.bind(this))
|
||||
this._socket.addEventListener('error', this.handleError.bind(this))
|
||||
this._socket.addEventListener('close', this.close)
|
||||
}
|
||||
|
||||
close(): void {
|
||||
if (this._state === TransportState.Idle) return
|
||||
debug('%s: close', this._currentDc!.ipAddress)
|
||||
|
||||
this.emit('close')
|
||||
this._state = TransportState.Idle
|
||||
this._socket!.removeEventListener('close', this.close)
|
||||
this._socket!.close()
|
||||
this._socket = null
|
||||
this._currentDc = null
|
||||
this._packetCodec.reset()
|
||||
}
|
||||
|
||||
async handleError({ error }: { error: Error }): Promise<void> {
|
||||
debug('%s: error: %s', this._currentDc!.ipAddress, error.stack)
|
||||
this.emit('error', error)
|
||||
}
|
||||
|
||||
async handleConnect(): Promise<void> {
|
||||
debug('%s: connected', this._currentDc!.ipAddress)
|
||||
const initialMessage = await this._packetCodec.tag()
|
||||
|
||||
this._socket!.send(initialMessage)
|
||||
this._state = TransportState.Ready
|
||||
this.emit('ready')
|
||||
}
|
||||
|
||||
async send(bytes: Buffer): Promise<void> {
|
||||
if (this._state !== TransportState.Ready)
|
||||
throw new Error('Transport is not READY')
|
||||
|
||||
const framed = await this._packetCodec.encode(bytes)
|
||||
|
||||
this._socket!.send(framed)
|
||||
}
|
||||
}
|
93
packages/core/src/network/transports/ws-obfuscated.ts
Normal file
93
packages/core/src/network/transports/ws-obfuscated.ts
Normal file
|
@ -0,0 +1,93 @@
|
|||
import { PacketCodec } from './abstract'
|
||||
import { ICryptoProvider, IEncryptionScheme } from '../../utils/crypto'
|
||||
import { EventEmitter } from 'events'
|
||||
import {
|
||||
buffersEqual,
|
||||
randomBytes,
|
||||
} from '../../utils/buffer-utils'
|
||||
import { WebSocketTransport } from './websocket'
|
||||
import { IntermediatePacketCodec } from './tcp-intermediate'
|
||||
|
||||
// initial payload can't start with these
|
||||
const BAD_HEADERS = [
|
||||
Buffer.from('GET', 'utf8'),
|
||||
Buffer.from('POST', 'utf8'),
|
||||
Buffer.from('HEAD', 'utf8'),
|
||||
Buffer.from('PVrG', 'utf8'),
|
||||
Buffer.from('eeeeeeee', 'hex'),
|
||||
]
|
||||
|
||||
export class ObfuscatedPacketCodec extends EventEmitter implements PacketCodec {
|
||||
private _inner: PacketCodec
|
||||
private _crypto: ICryptoProvider
|
||||
private _encryptor?: IEncryptionScheme
|
||||
private _decryptor?: IEncryptionScheme
|
||||
|
||||
constructor(inner: PacketCodec) {
|
||||
super()
|
||||
this._inner = inner
|
||||
this._inner.on('error', (err) => this.emit('error', err))
|
||||
this._inner.on('packet', (buf) => this.emit('packet', buf))
|
||||
}
|
||||
|
||||
setupCrypto(crypto: ICryptoProvider): void {
|
||||
this._crypto = crypto
|
||||
}
|
||||
|
||||
async tag(): Promise<Buffer> {
|
||||
let random: Buffer
|
||||
r: for (;;) {
|
||||
random = randomBytes(64)
|
||||
if (random[0] === 0xef) continue
|
||||
for (const h of BAD_HEADERS) {
|
||||
if (buffersEqual(random.slice(0, h.length), h)) continue r
|
||||
}
|
||||
if (random.readUInt32LE(4) === 0) continue
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
let innerTag = await this._inner.tag()
|
||||
if (innerTag.length !== 4) {
|
||||
const b = innerTag[0]
|
||||
innerTag = Buffer.from([b, b, b, b])
|
||||
}
|
||||
innerTag.copy(random, 56)
|
||||
|
||||
const randomRev = Buffer.from(random.slice(8, 56)).reverse()
|
||||
|
||||
const encryptKey = random.slice(8, 40)
|
||||
const encryptIv = random.slice(40, 56)
|
||||
|
||||
const decryptKey = randomRev.slice(0, 32)
|
||||
const decryptIv = randomRev.slice(32, 48)
|
||||
|
||||
this._encryptor = this._crypto.createAesCtr(encryptKey, encryptIv, true)
|
||||
this._decryptor = this._crypto.createAesCtr(decryptKey, decryptIv, false)
|
||||
|
||||
const encrypted = await this._encryptor.encrypt(random)
|
||||
encrypted.copy(random, 56, 56, 64)
|
||||
|
||||
return random
|
||||
}
|
||||
|
||||
async encode(packet: Buffer): Promise<Buffer> {
|
||||
return this._encryptor!.encrypt(await this._inner.encode(packet))
|
||||
}
|
||||
|
||||
feed(data: Buffer): void {
|
||||
const dec = this._decryptor!.decrypt(data)
|
||||
if (dec instanceof Buffer) this._inner.feed(dec)
|
||||
else dec.then((dec) => this._inner.feed(dec))
|
||||
}
|
||||
|
||||
reset(): void {
|
||||
this._inner.reset()
|
||||
delete this._encryptor
|
||||
delete this._decryptor
|
||||
}
|
||||
}
|
||||
|
||||
export class WebSocketObfuscatedTransport extends WebSocketTransport {
|
||||
_packetCodec = new ObfuscatedPacketCodec(new IntermediatePacketCodec())
|
||||
}
|
Loading…
Reference in a new issue