refactor: improved logging, moved to custom logger instead of debug

This commit is contained in:
teidesu 2021-08-14 12:57:26 +03:00
parent 64aae43572
commit 9b5ca0cb2a
27 changed files with 495 additions and 141 deletions

View file

@ -18,7 +18,6 @@
"@mtcute/file-id": "^1.0.0",
"eager-async-pool": "^1.0.0",
"file-type": "^16.2.0",
"big-integer": "1.6.48",
"debug": "^4.3.1"
"big-integer": "1.6.48"
}
}

View file

@ -228,6 +228,7 @@ import {
TelegramConnection,
} from '@mtcute/core'
import { tdFileId } from '@mtcute/file-id'
import { Logger } from '@mtcute/core/src/utils/logger'
export interface TelegramClient extends BaseTelegramClient {
/**
@ -3499,6 +3500,7 @@ export class TelegramClient extends BaseTelegramClient {
protected _catchUpChannels?: boolean
protected _cpts: Record<number, number>
protected _cptsMod: Record<number, number>
protected _updsLog: Logger
constructor(opts: BaseTelegramClient.Options) {
super(opts)
this._userId = null
@ -3522,6 +3524,8 @@ export class TelegramClient extends BaseTelegramClient {
this._cptsMod = {}
this._selfChanged = false
this._updsLog = this.log.create('updates')
}
acceptTos = acceptTos

View file

@ -66,3 +66,6 @@ import {
// @copy
import { tdFileId } from '@mtcute/file-id'
// @copy
import { Logger } from '@mtcute/core/src/utils/logger'

View file

@ -20,7 +20,6 @@ try {
path = require('path')
} catch (e) {}
const debug = require('debug')('mtcute:upload')
const OVERRIDE_MIME: Record<string, string> = {
// tg doesn't interpret `audio/opus` files as voice messages for some reason
@ -201,7 +200,7 @@ export async function uploadFile(
const hash = this._crypto.createMd5()
const partCount = ~~((fileSize + partSize - 1) / partSize)
debug(
this._baseLog.debug(
'uploading %d bytes file in %d chunks, each %d bytes',
fileSize,
partCount,

View file

@ -10,13 +10,12 @@ import {
getBarePeerId,
getMarkedPeerId,
markedPeerIdToBare,
MAX_CHANNEL_ID, RpcError,
MAX_CHANNEL_ID,
} from '@mtcute/core'
import { isDummyUpdate, isDummyUpdates } from '../utils/updates-utils'
import { isDummyUpdate } from '../utils/updates-utils'
import { ChatsIndex, UsersIndex } from '../types'
import { _parseUpdate } from '../utils/parse-update'
const debug = require('debug')('mtcute:upds')
import { Logger } from '@mtcute/core/src/utils/logger'
// code in this file is very bad, thanks to Telegram's awesome updates mechanism
@ -46,6 +45,8 @@ interface UpdatesState {
_cpts: Record<number, number>
_cptsMod: Record<number, number>
_updsLog: Logger
}
// @initialize
@ -62,6 +63,8 @@ function _initializeUpdates(this: TelegramClient) {
this._cptsMod = {}
this._selfChanged = false
this._updsLog = this.log.create('updates')
}
/**
@ -103,7 +106,7 @@ export async function _fetchUpdatesState(this: TelegramClient): Promise<void> {
this._date = state.date
this._seq = state.seq
debug(
this._updsLog.debug(
'loaded initial state: pts=%d, qts=%d, date=%d, seq=%d',
state.pts,
state.qts,
@ -463,9 +466,15 @@ async function _loadDifference(
switch (diff._) {
case 'updates.differenceEmpty':
this._updsLog.debug(
'updates.getDifference returned updates.differenceEmpty'
)
return
case 'updates.differenceTooLong':
this._pts = diff.pts
this._updsLog.debug(
'updates.getDifference returned updates.differenceTooLong'
)
return
}
@ -474,6 +483,16 @@ async function _loadDifference(
? diff.state
: diff.intermediateState
this._updsLog.debug(
'updates.getDifference returned %d messages, %d updates. new pts: %d, qts: %d, seq: %d, final: %b',
diff.newMessages.length,
diff.otherUpdates.length,
state.pts,
state.qts,
state.seq,
diff._ === 'updates.difference'
)
await this._cachePeersFrom(diff)
const { users, chats } = createUsersChatsIndex(diff)
@ -524,8 +543,23 @@ async function _loadDifference(
}
if (nextLocalPts) {
if (nextLocalPts > pts) continue
if (nextLocalPts > pts) {
this._updsLog.debug(
'ignoring %s (in channel %d) because already handled (by pts: exp %d, got %d)',
upd._,
cid,
nextLocalPts,
pts
)
continue
}
if (nextLocalPts < pts) {
this._updsLog.debug(
'fetching channel %d difference because gap detected (by pts: exp %d, got %d)',
cid,
nextLocalPts,
pts
)
await _loadChannelDifference.call(
this,
cid,
@ -550,6 +584,7 @@ async function _loadDifference(
this._pts = state.pts
this._qts = state.qts
this._seq = state.seq
this._date = state.date
if (diff._ === 'updates.difference') return
@ -568,6 +603,10 @@ async function _loadChannelDifference(
await this.resolvePeer(MAX_CHANNEL_ID - channelId)
)!
} catch (e) {
this._updsLog.warn(
'getChannelDifference failed for channel %d: input peer not found',
channelId
)
return
}
@ -591,7 +630,13 @@ async function _loadChannelDifference(
filter: { _: 'channelMessagesFilterEmpty' },
})
if (diff._ === 'updates.channelDifferenceEmpty') break
if (diff._ === 'updates.channelDifferenceEmpty') {
this._updsLog.debug(
'getChannelDifference (cid = %d) returned channelDifferenceEmpty',
channelId
)
break
}
await this._cachePeersFrom(diff)
@ -602,6 +647,13 @@ async function _loadChannelDifference(
pts = diff.dialog.pts!
}
this._updsLog.warn(
'getChannelDifference (cid = %d) returned channelDifferenceTooLong. new pts: %d, recent msgs: %d',
channelId,
pts,
diff.messages.length
)
diff.messages.forEach((message) => {
if (noDispatch && noDispatch.msg[channelId]?.[message.id])
return
@ -612,6 +664,15 @@ async function _loadChannelDifference(
break
}
this._updsLog.warn(
'getChannelDifference (cid = %d) returned %d messages, %d updates. new pts: %d, final: %b',
channelId,
diff.newMessages.length,
diff.otherUpdates.length,
pts,
diff.final
)
diff.newMessages.forEach((message) => {
if (noDispatch && noDispatch.msg[channelId]?.[message.id]) return
if (message._ === 'messageEmpty') return
@ -682,10 +743,24 @@ async function _processSingleUpdate(
}
if (nextLocalPts) {
if (nextLocalPts > pts)
if (nextLocalPts > pts) {
// "the update was already applied, and must be ignored"
this._updsLog.debug(
'ignoring %s (cid = %d) because already applied (by pts: exp %d, got %d)',
upd._,
channelId,
nextLocalPts,
pts
)
return
}
if (nextLocalPts < pts) {
this._updsLog.debug(
'fetching difference (cid = %d) because gap detected (by pts: exp %d, got %d)',
channelId,
nextLocalPts,
pts
)
if (channelId) {
// "there's an update gap that must be filled"
await _loadChannelDifference.call(
@ -709,18 +784,40 @@ async function _processSingleUpdate(
// qts is only used for non-channel updates
const nextLocalQts = this._qts! + 1
if (nextLocalQts > qts)
if (nextLocalQts > qts) {
// "the update was already applied, and must be ignored"
this._updsLog.debug(
'ignoring %s because already applied (by qts: exp %d, got %d)',
upd._,
nextLocalQts,
qts
)
return
if (nextLocalQts < qts)
}
if (nextLocalQts < qts) {
this._updsLog.debug(
'fetching difference because gap detected (by qts: exp %d, got %d)',
upd._,
nextLocalQts,
qts
)
return await _loadDifference.call(
this,
noDispatch ? _createNoDispatchIndex(upd) : undefined
)
}
}
// update local pts/qts
if (pts) {
this._updsLog.debug(
'received pts-ordered %s (cid = %d), new pts: %d',
upd._,
channelId,
pts
)
if (channelId) {
this._cpts[channelId] = pts
this._cptsMod[channelId] = pts
@ -730,6 +827,8 @@ async function _processSingleUpdate(
}
if (qts) {
this._updsLog.debug('received qts-ordered %s, new qts: %d', upd._, qts)
this._qts = qts
}
@ -765,6 +864,12 @@ async function _processSingleUpdate(
// this is a short update, let's fetch cached peers
peers = await _fetchPeersForShort.call(this, upd)
if (!peers) {
this._updsLog.debug(
'fetching difference because some peers were not available for short %s (pts = %d, cid = %d)',
upd._,
pts,
channelId
)
// some peer is not cached.
// need to re-fetch the thing, and cache them on the way
return await _loadDifference.call(this)
@ -785,7 +890,9 @@ export function _handleUpdate(
): void {
// just in case, check that updates state is available
if (this._pts === undefined) {
debug('received an update before updates state is available')
this._updsLog.warn(
'received an update before updates state is available'
)
return
}
@ -800,8 +907,6 @@ export function _handleUpdate(
this._updLock
.acquire()
.then(async () => {
debug('received %s', update._)
// i tried my best to follow the documentation, but i still may have missed something.
// feel free to contribute!
// reference: https://core.telegram.org/api/updates
@ -824,26 +929,41 @@ export function _handleUpdate(
// https://t.me/tdlibchat/5843
const nextLocalSeq = this._seq! + 1
debug(
this._updsLog.debug(
'received %s (seq_start = %d, seq_end = %d)',
update._,
seqStart,
update.seq
)
if (nextLocalSeq > seqStart)
if (nextLocalSeq > seqStart) {
this._updsLog.debug(
'ignoring updates group because already applied (by seq: exp %d, got %d)',
nextLocalSeq,
seqStart
)
// "the updates were already applied, and must be ignored"
return
if (nextLocalSeq < seqStart)
}
if (nextLocalSeq < seqStart) {
this._updsLog.debug(
'fetching difference because gap detected (by seq: exp %d, got %d)',
nextLocalSeq,
seqStart
)
// "there's an updates gap that must be filled"
// loading difference will also load any updates contained
// in this update, so we discard it
return await _loadDifference.call(this)
}
}
const hasMin = await this._cachePeersFrom(update)
if (hasMin) {
if (!(await _replaceMinPeers.call(this, update))) {
this._updsLog.debug(
'fetching difference because some peers were min and not cached'
)
// some min peer is not cached.
// need to re-fetch the thing, and cache them on the way
return await _loadDifference.call(this)
@ -1005,6 +1125,6 @@ export function catchUp(this: TelegramClient): Promise<void> {
/** @internal */
export function _keepAliveAction(this: TelegramClient): void {
debug('no updates for >15 minutes, catching up')
this._updsLog.debug('no updates for >15 minutes, catching up')
this.catchUp().catch((err) => this._emitError(err))
}

View file

@ -14,6 +14,7 @@
"browser": {
"./utils/platform/crypto.js": "./utils/platform/crypto.web.js",
"./utils/platform/transport.js": "./utils/platform/transport.web.js",
"./utils/platform/logging.js": "./utils/platform/logging.web.js",
"./storage/json-file.js": false
},
"dependencies": {
@ -23,7 +24,6 @@
"leemon": "6.2.0",
"pako": "2.0.2",
"big-integer": "1.6.48",
"debug": "^4.3.1",
"events": "3.2.0"
},
"devDependencies": {

View file

@ -1,7 +1,6 @@
import { BaseTelegramClient, defaultDcs } from '../src'
require('dotenv-flow').config({ path: __dirname + '/../' })
require('debug').enable('mtcute:*')
if (!process.env.API_ID || !process.env.API_HASH) {
console.warn('Set API_ID and API_HASH env variables')

View file

@ -38,8 +38,7 @@ import { BinaryWriter } from './utils/binary/binary-writer'
import { encodeUrlSafeBase64, parseUrlSafeBase64 } from './utils/buffer-utils'
import { BinaryReader } from './utils/binary/binary-reader'
import EventEmitter from 'events'
const debug = require('debug')('mtcute:base')
import { LogManager } from './utils/logger'
export namespace BaseTelegramClient {
export interface Options {
@ -269,6 +268,9 @@ export class BaseTelegramClient extends EventEmitter {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
protected _handleUpdate(update: tl.TypeUpdates): void {}
readonly log = new LogManager()
protected readonly _baseLog = this.log.create('base')
constructor(opts: BaseTelegramClient.Options) {
super()
@ -299,6 +301,8 @@ export class BaseTelegramClient extends EventEmitter {
this._disableUpdates = opts.disableUpdates ?? false
this._niceStacks = opts.niceStacks ?? true
this.storage.setup?.(this._baseLog)
this._layer = opts.overrideLayer ?? tl.CURRENT_LAYER
let deviceModel = 'MTCute on '
@ -361,7 +365,7 @@ export class BaseTelegramClient extends EventEmitter {
testMode: this._testMode,
reconnectionStrategy: this._reconnectionStrategy,
layer: this._layer,
})
}, this.log.create('connection'))
this.primaryConnection.on('usable', async (isReconnection: boolean) => {
this._lastUpdateTime = Date.now()
@ -595,7 +599,7 @@ export class BaseTelegramClient extends EventEmitter {
lastError = e
if (e instanceof InternalError) {
debug('Telegram is having internal issues: %s', e)
this._baseLog.warn('Telegram is having internal issues: %s', e)
if (e.message === 'WORKER_BUSY_TOO_LONG_RETRY') {
// according to tdlib, "it is dangerous to resend query without timeout, so use 1"
await sleep(1000)
@ -624,7 +628,7 @@ export class BaseTelegramClient extends EventEmitter {
params?.throwFlood !== true &&
e.seconds <= this._floodSleepThreshold
) {
debug('Flood wait for %d seconds', e.seconds)
this._baseLog.info('Flood wait for %d seconds', e.seconds)
await sleep(e.seconds * 1000)
continue
}
@ -636,14 +640,14 @@ export class BaseTelegramClient extends EventEmitter {
e.constructor === UserMigrateError ||
e.constructor === NetworkMigrateError
) {
debug('Migrate error, new dc = %d', e.newDc)
this._baseLog.info('Migrate error, new dc = %d', e.newDc)
await this.changeDc(e.newDc)
continue
}
} else {
if (e.constructor === AuthKeyUnregisteredError) {
// we can try re-exporting auth from the primary connection
debug('exported auth key error, re-exporting..')
this._baseLog.warn('exported auth key error, re-exporting..')
const auth = await this.call({
_: 'auth.exportAuthorization',
@ -700,14 +704,14 @@ export class BaseTelegramClient extends EventEmitter {
reconnectionStrategy: this._reconnectionStrategy,
inactivityTimeout,
layer: this._layer,
})
}, this.log.create('connection'))
connection.on('error', (err) => this._emitError(err, connection))
connection.authKey = await this.storage.getAuthKeyFor(dc.id)
connection.connect()
if (!connection.authKey) {
debug('exporting auth to DC %d', dcId)
this._baseLog.info('exporting auth to DC %d', dcId)
const auth = await this.call({
_: 'auth.exportAuthorization',
dcId,
@ -807,7 +811,7 @@ export class BaseTelegramClient extends EventEmitter {
const parsedPeers: ITelegramStorage.PeerInfo[] = []
let hadMin = false
let count = 0
for (const peer of getAllPeersFrom(obj)) {
if ((peer as any).min) {
// absolutely incredible min peer handling, courtesy of levlam.
@ -816,6 +820,8 @@ export class BaseTelegramClient extends EventEmitter {
continue
}
count += 1
switch (peer._) {
case 'user':
parsedPeers.push({
@ -854,6 +860,10 @@ export class BaseTelegramClient extends EventEmitter {
await this.storage.updatePeers(parsedPeers)
if (count > 0) {
this._baseLog.debug('cached %d peers, had min: %b', count, hadMin)
}
return hadMin
}

View file

@ -16,8 +16,6 @@ import {
bufferToBigInt,
} from '../utils/bigint-utils'
const debug = require('debug')('mtcute:auth')
// Heavily based on code from https://github.com/LonamiWebs/Telethon/blob/master/telethon/network/authenticator.py
const DH_SAFETY_RANGE = bigIntTwo.pow(2048 - 64)
@ -45,9 +43,11 @@ export async function doAuthorization(
return connection.send(writer.result())
}
const log = connection.log.create('auth')
const nonce = randomBytes(16)
// Step 1: PQ request
debug(
log.debug(
'%s: starting PQ handshake, nonce = %h',
connection.params.dc.ipAddress,
nonce
@ -61,7 +61,7 @@ export async function doAuthorization(
if (resPq._ !== 'mt_resPQ') throw new Error('Step 1: answer was ' + resPq._)
if (!buffersEqual(resPq.nonce, nonce))
throw new Error('Step 1: invalid nonce from server')
debug('%s: received PQ', connection.params.dc.ipAddress)
log.debug('%s: received PQ', connection.params.dc.ipAddress)
// Step 2: DH exchange
const publicKey = findKeyByFingerprints(resPq.serverPublicKeyFingerprints)
@ -72,14 +72,14 @@ export async function doAuthorization(
.map((i) => i.toString(16))
.join(', ')
)
debug(
log.debug(
'%s: found server key, fp = %s',
connection.params.dc.ipAddress,
publicKey.fingerprint
)
const [p, q] = await crypto.factorizePQ(resPq.pq)
debug('%s: factorized PQ', connection.params.dc.ipAddress)
log.debug('%s: factorized PQ', connection.params.dc.ipAddress)
const newNonce = randomBytes(32)
@ -98,7 +98,7 @@ export async function doAuthorization(
dc: dcId
} as tl.mtproto.RawP_q_inner_data_dc)
const encryptedData = await crypto.rsaEncrypt(pqInnerData, publicKey)
debug('%s: requesting DH params', connection.params.dc.ipAddress)
log.debug('%s: requesting DH params', connection.params.dc.ipAddress)
await sendPlainMessage({
_: 'mt_reqDHParams',
@ -131,7 +131,7 @@ export async function doAuthorization(
// throw new Error('Step 2: server DH failed')
// }
debug('%s: server DH ok', connection.params.dc.ipAddress)
log.debug('%s: server DH ok', connection.params.dc.ipAddress)
if (serverDhParams.encryptedAnswer.length % 16 != 0)
throw new Error('Step 2: AES block size is invalid')
@ -234,7 +234,7 @@ export async function doAuthorization(
clientDhInnerWriter.pos = 0
clientDhInnerWriter.raw(clientDhInnerHash)
debug(
log.debug(
'%s: sending client DH (timeOffset = %d)',
connection.params.dc.ipAddress,
timeOffset
@ -260,7 +260,7 @@ export async function doAuthorization(
if (!buffersEqual(dhGen.serverNonce, resPq.serverNonce))
throw Error('Step 4: invalid server nonce from server')
debug('%s: DH result: %s', connection.params.dc.ipAddress, dhGen._)
log.debug('%s: DH result: %s', connection.params.dc.ipAddress, dhGen._)
if (dhGen._ === 'mt_dh_gen_fail') {
// in theory i would be supposed to calculate newNonceHash, but why, we are failing anyway
@ -284,7 +284,7 @@ export async function doAuthorization(
if (!buffersEqual(expectedHash.slice(4, 20), dhGen.newNonceHash1))
throw Error('Step 4: invalid nonce hash from server')
debug('%s: authorization successful', connection.params.dc.ipAddress)
log.info('%s: authorization successful', connection.params.dc.ipAddress)
return [authKey, serverSalt, timeOffset]
}

View file

@ -8,8 +8,7 @@ import {
SerializationCounter,
} from '../utils/binary/binary-writer'
import { BinaryReader } from '../utils/binary/binary-reader'
const debug = require('debug')('mtcute:sess')
import { Logger } from '../utils/logger'
export interface EncryptedMessage {
messageId: BigInteger
@ -34,7 +33,7 @@ export class MtprotoSession {
// default salt: [0x00]*8
serverSalt: Buffer = Buffer.alloc(8)
constructor(crypto: ICryptoProvider) {
constructor(crypto: ICryptoProvider, readonly log: Logger) {
this._crypto = crypto
}
@ -114,7 +113,7 @@ export class MtprotoSession {
let encryptedData = reader.raw()
if (!buffersEqual(authKeyId, this._authKeyId!)) {
debug(
this.log.warn(
'[%h] warn: received message with unknown authKey = %h (expected %h)',
this._sessionId,
authKeyId,
@ -144,8 +143,8 @@ export class MtprotoSession {
)
).slice(8, 24)
if (!buffersEqual(messageKey, expectedMessageKey)) {
debug(
'[%h] warn: received message with invalid messageKey = %h (expected %h)',
this.log.warn(
'[%h] received message with invalid messageKey = %h (expected %h)',
this._sessionId,
messageKey,
expectedMessageKey
@ -159,8 +158,8 @@ export class MtprotoSession {
const messageId = innerReader.long(true)
if (!buffersEqual(sessionId, this._sessionId)) {
debug(
'warn: ignoring message with invalid sessionId = %h',
this.log.warn(
'ignoring message with invalid sessionId = %h',
sessionId
)
return null
@ -170,8 +169,8 @@ export class MtprotoSession {
const length = innerReader.uint32()
if (length > innerData.length - 32 /* header size */) {
debug(
'warn: ignoring message with invalid length: %d > %d',
this.log.warn(
'ignoring message with invalid length: %d > %d',
length,
innerData.length - 32
)
@ -179,8 +178,8 @@ export class MtprotoSession {
}
if (length % 4 !== 0) {
debug(
'warn: ignoring message with invalid length: %d is not a multiple of 4',
this.log.warn(
'ignoring message with invalid length: %d is not a multiple of 4',
length
)
return null
@ -191,8 +190,8 @@ export class MtprotoSession {
const paddingSize = innerData.length - innerReader.pos
if (paddingSize < 12 || paddingSize > 1024) {
debug(
'warn: ignoring message with invalid padding size: %d',
this.log.warn(
'ignoring message with invalid padding size: %d',
paddingSize
)
return null

View file

@ -7,8 +7,7 @@ import {
createControllablePromise,
} from '../utils/controllable-promise'
import { ICryptoProvider } from '../utils/crypto'
const debug = require('debug')('mtcute:conn')
import { Logger } from '../utils/logger'
export interface PersistentConnectionParams {
crypto: ICryptoProvider
@ -51,7 +50,7 @@ export abstract class PersistentConnection extends EventEmitter {
protected abstract onMessage(data: Buffer): void
protected constructor(params: PersistentConnectionParams) {
protected constructor(params: PersistentConnectionParams, readonly log: Logger) {
super()
this.params = params
this.changeTransport(params.transportFactory)
@ -63,7 +62,7 @@ export abstract class PersistentConnection extends EventEmitter {
}
this._transport = factory()
this._transport.setupCrypto?.(this.params.crypto)
this._transport.setup?.(this.params.crypto, this.log)
this._transport.on('ready', this.onTransportReady.bind(this))
this._transport.on('message', this.onTransportMessage.bind(this))
@ -175,7 +174,7 @@ export abstract class PersistentConnection extends EventEmitter {
if (!this.params.inactivityTimeout) return
if (this._inactivityTimeout) clearTimeout(this._inactivityTimeout)
this._inactivityTimeout = setTimeout(() => {
debug(
this.log.info(
'disconnected because of inactivity for %d',
this.params.inactivityTimeout
)

View file

@ -23,6 +23,7 @@ import {
RpcTimeoutError,
} from '@mtcute/tl/errors'
import { LruStringSet } from '../utils/lru-string-set'
import { Logger } from '../utils/logger'
function makeNiceStack(error: RpcError, stack: string, method?: string) {
error.stack = `${error.constructor.name} (${error.code} ${error.text}): ${
@ -30,14 +31,6 @@ function makeNiceStack(error: RpcError, stack: string, method?: string) {
}\n at ${method}\n${stack.split('\n').slice(2).join('\n')}`
}
const _debug = require('debug')
const debug = _debug('mtcute:conn')
// hex formatting buffers with %h
_debug.formatters.h = (v: Buffer): string => v.toString('hex')
// true/false formatting with %b
_debug.formatters.b = (v: any): string => !!v + ''
export interface TelegramConnectionParams extends PersistentConnectionParams {
initConnection: tl.RawInitConnectionRequest
inactivityTimeout?: number
@ -112,9 +105,9 @@ export class TelegramConnection extends PersistentConnection {
this._pendingAcks = []
}, 500)
constructor(params: TelegramConnectionParams) {
super(params)
this._mtproto = new MtprotoSession(this.params.crypto)
constructor(params: TelegramConnectionParams, log: Logger) {
super(params, log)
this._mtproto = new MtprotoSession(this.params.crypto, log.create('session'))
}
onTransportClose(): void {
@ -163,7 +156,7 @@ export class TelegramConnection extends PersistentConnection {
protected onError(error: Error): void {
// https://core.telegram.org/mtproto/mtproto-_transports#_transport-errors
if (error instanceof TransportError) {
debug('transport error %d (dc %d)', error.code, this.params.dc.id)
this.log.error('transport error %d (dc %d)', error.code, this.params.dc.id)
if (error.code === 404) {
this._mtproto.reset()
this.emit('key-change', null)
@ -221,7 +214,7 @@ export class TelegramConnection extends PersistentConnection {
// if a message is received before authorization,
// either the server is misbehaving,
// or there was a problem with authorization.
debug('warn: received message before authorization')
this.log.warn('warn: received message before authorization')
return
}
@ -241,7 +234,7 @@ export class TelegramConnection extends PersistentConnection {
}
private _resend(it: PendingMessage, id?: string): void {
debug('resending %s', it.method)
this.log.debug('resending %s', it.method)
this._sendBufferForResult(it).catch(it.promise.reject)
if (it.cancel) clearTimeout(it.cancel)
if (id) delete this._pendingRpcCalls[id]
@ -260,7 +253,7 @@ export class TelegramConnection extends PersistentConnection {
this.onConnectionUsable()
})
.catch((err) => {
debug('Authorization error: %s', err.message)
this.log.error('Authorization error: %s', err.message)
this.onError(err)
this.reconnect()
})
@ -271,7 +264,7 @@ export class TelegramConnection extends PersistentConnection {
messageId: BigInteger
): Promise<void> {
if (messageId.isEven()) {
debug(
this.log.warn(
'warn: ignoring message with invalid messageId = %s (is even)',
messageId
)
@ -294,7 +287,7 @@ export class TelegramConnection extends PersistentConnection {
return
}
debug('unknown message received: %o', message)
this.log.warn('unknown message received: %o', message)
}
private _handleContainer(message: tl.mtproto.RawMsg_container): void {
@ -310,10 +303,10 @@ export class TelegramConnection extends PersistentConnection {
const reqMsgId = message.reqMsgId.toString(16)
const pending = this._pendingRpcCalls[reqMsgId]
if (!pending) {
debug('received rpc result for unknown message %s', reqMsgId)
this.log.warn('received rpc result for unknown message %s', reqMsgId)
return
}
debug('handling rpc result for %s (%s)', reqMsgId, pending.method)
this.log.request('<<< (%s) %j', pending.method, message.result)
if (message.result._ === 'mt_rpc_error') {
const error = createRpcErrorFromTl(message.result)
@ -332,7 +325,7 @@ export class TelegramConnection extends PersistentConnection {
private _handlePong(message: tl.mtproto.RawPong): void {
const msgId = message.msgId.toString(16)
debug('handling pong for %s (ping id %s)', msgId, message.pingId)
this.log.debug('handling pong for %s (ping id %s)', msgId, message.pingId)
if (this._pendingPing && message.pingId.eq(this._pendingPing)) {
this._pendingPing = null
@ -347,7 +340,7 @@ export class TelegramConnection extends PersistentConnection {
pending.promise.resolve(message)
delete this._pendingRpcCalls[msgId]
} else {
debug('pong to unknown ping %o', message)
this.log.warn('pong to unknown ping %o', message)
}
}
@ -356,7 +349,7 @@ export class TelegramConnection extends PersistentConnection {
): Promise<void> {
const badMsgId = message.badMsgId.toString(16)
debug(
this.log.debug(
'handling bad_server_salt for msg %s, new salt: %h',
badMsgId,
message.newServerSalt
@ -375,7 +368,7 @@ export class TelegramConnection extends PersistentConnection {
this._pendingPing = null
this._pendingPingMsgId = null
} else {
debug('bad_server_salt to unknown message %o', message)
this.log.warn('bad_server_salt to unknown message %o', message)
}
}
@ -385,7 +378,7 @@ export class TelegramConnection extends PersistentConnection {
): Promise<void> {
const badMsgId = message.badMsgId.toString(16)
debug('handling bad_msg_notification, code: %d', message.errorCode)
this.log.debug('handling bad_msg_notification, code: %d', message.errorCode)
if (message.errorCode === 16 || message.errorCode === 17) {
// msg_id is either too high or too low
@ -415,7 +408,7 @@ export class TelegramConnection extends PersistentConnection {
if (this._pendingRpcCalls[badMsgId]) {
this._resend(this._pendingRpcCalls[badMsgId], badMsgId)
} else {
debug('bad_msg_notification to unknown message %s', badMsgId)
this.log.warn('bad_msg_notification to unknown message %s', badMsgId)
}
}
@ -452,7 +445,7 @@ export class TelegramConnection extends PersistentConnection {
}
}
debug(
this.log.debug(
'handling new_session_created (sid = %h), salt: %h',
this._mtproto._sessionId,
message.serverSalt
@ -463,7 +456,7 @@ export class TelegramConnection extends PersistentConnection {
private _handleDetailedInfo(
message: tl.mtproto.RawMsg_detailed_info
): void {
debug(
this.log.debug(
'handling msg_detailed_info (sid = %h), msgId = %s',
this._mtproto._sessionId,
message.answerMsgId
@ -487,7 +480,7 @@ export class TelegramConnection extends PersistentConnection {
private _handleNewDetailedInfo(
message: tl.mtproto.RawMsg_new_detailed_info
): void {
debug(
this.log.debug(
'handling msg_new_detailed_info (sid = %h), msgId = %s',
this._mtproto._sessionId,
message.answerMsgId
@ -510,7 +503,7 @@ export class TelegramConnection extends PersistentConnection {
private _handleFutureSalts(message: tl.mtproto.RawFuture_salts): void {
// TODO actually handle these salts
debug(
this.log.debug(
'handling future_salts (sid = %h), msgId = %s, %d salts',
this._mtproto._sessionId,
message.reqMsgId,
@ -663,11 +656,11 @@ export class TelegramConnection extends PersistentConnection {
if (this._usable && this.params.inactivityTimeout)
this._rescheduleInactivity()
debug('making rpc call %s', message._)
this.log.request('>>> %j', message)
let obj: tl.TlObject = message
if (!this._initConnectionCalled) {
debug('wrapping %s with initConnection', message._)
this.log.debug('wrapping %s with initConnection', message._)
obj = {
_: 'invokeWithLayer',
layer: this.params.layer,

View file

@ -2,6 +2,7 @@ import { tl } from '@mtcute/tl'
import { MaybeAsync } from '../../types/utils'
import { ICryptoProvider } from '../../utils/crypto'
import EventEmitter from 'events'
import { Logger } from '../../utils/logger'
export enum TransportState {
/**
@ -50,10 +51,12 @@ export interface ITelegramTransport extends EventEmitter {
send(data: Buffer): Promise<void>
/**
* For transports whose codecs use crypto functions.
* Provides crypto and logging for the transport.
* Not done in constructor to simplify factory.
*
* This method is called before any other.
*/
setupCrypto?(crypto: ICryptoProvider): void
setup?(crypto: ICryptoProvider, log: Logger): void
}
/** Transport factory function */
@ -88,10 +91,10 @@ export interface IPacketCodec {
on(event: 'packet', handler: (packet: Buffer) => void): void
/**
* For codecs that use crypto functions.
* For codecs that use crypto functions and/or logging.
* This method is called before any other.
*/
setupCrypto?(crypto: ICryptoProvider): void
setup?(crypto: ICryptoProvider, log: Logger): void
}
/**

View file

@ -4,8 +4,7 @@ import { Socket, connect } from 'net'
import EventEmitter from 'events'
import { ICryptoProvider } from '../../utils/crypto'
import { IntermediatePacketCodec } from './intermediate'
const debug = require('debug')('mtcute:tcp')
import { Logger } from '../../utils/logger'
/**
* Base for TCP transports.
@ -20,11 +19,13 @@ export abstract class BaseTcpTransport
abstract _packetCodec: IPacketCodec
protected _crypto!: ICryptoProvider
protected log!: Logger
packetCodecInitialized = false
setupCrypto(crypto: ICryptoProvider): void {
setup(crypto: ICryptoProvider, log: Logger): void {
this._crypto = crypto
this.log = log.create('tcp')
}
state(): TransportState {
@ -35,12 +36,13 @@ export abstract class BaseTcpTransport
return this._currentDc
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
connect(dc: tl.RawDcOption, testMode: boolean): void {
if (this._state !== TransportState.Idle)
throw new Error('Transport is not IDLE')
if (!this.packetCodecInitialized) {
this._packetCodec.setupCrypto?.(this._crypto)
this._packetCodec.setup?.(this._crypto, this.log)
this._packetCodec.on('error', (err) => this.emit('error', err))
this._packetCodec.on('packet', (buf) => this.emit('message', buf))
this.packetCodecInitialized = true
@ -61,7 +63,7 @@ export abstract class BaseTcpTransport
close(): void {
if (this._state === TransportState.Idle) return
debug('%s: close', this._currentDc!.ipAddress)
this.log.debug('%s: close', this._currentDc!.ipAddress)
this.emit('close')
this._state = TransportState.Idle
@ -73,12 +75,12 @@ export abstract class BaseTcpTransport
}
async handleError(error: Error): Promise<void> {
debug('%s: error: %s', this._currentDc!.ipAddress, error.stack)
this.log.error('%s: error: %s', this._currentDc!.ipAddress, error.stack)
this.emit('error', error)
}
async handleConnect(): Promise<void> {
debug('%s: connected', this._currentDc!.ipAddress)
this.log.debug('%s: connected', this._currentDc!.ipAddress)
const initialMessage = await this._packetCodec.tag()
if (initialMessage.length) {

View file

@ -6,8 +6,7 @@ import { ICryptoProvider } from '../../utils/crypto'
import type WebSocket from 'ws'
import { IntermediatePacketCodec } from './intermediate'
import { ObfuscatedPacketCodec } from './obfuscated'
const debug = require('debug')('mtcute:ws')
import { Logger } from '../../utils/logger'
let ws: {
new (address: string, options?: string): WebSocket
@ -41,6 +40,7 @@ export abstract class BaseWebSocketTransport
private _state: TransportState = TransportState.Idle
private _socket: WebSocket | null = null
private _crypto!: ICryptoProvider
protected log!: Logger
abstract _packetCodec: IPacketCodec
packetCodecInitialized = false
@ -69,8 +69,9 @@ export abstract class BaseWebSocketTransport
this.close = this.close.bind(this)
}
setupCrypto(crypto: ICryptoProvider): void {
setup(crypto: ICryptoProvider, log: Logger): void {
this._crypto = crypto
this.log = log.create('tcp')
}
state(): TransportState {
@ -86,7 +87,7 @@ export abstract class BaseWebSocketTransport
throw new Error('Transport is not IDLE')
if (!this.packetCodecInitialized) {
this._packetCodec.setupCrypto?.(this._crypto)
this._packetCodec.setup?.(this._crypto, this.log)
this._packetCodec.on('error', (err) => this.emit('error', err))
this._packetCodec.on('packet', (buf) => this.emit('message', buf))
this.packetCodecInitialized = true
@ -113,7 +114,7 @@ export abstract class BaseWebSocketTransport
close(): void {
if (this._state === TransportState.Idle) return
debug('%s: close', this._currentDc!.ipAddress)
this.log.debug('%s: close', this._currentDc!.ipAddress)
this.emit('close')
this._state = TransportState.Idle
@ -125,12 +126,12 @@ export abstract class BaseWebSocketTransport
}
async handleError({ error }: { error: Error }): Promise<void> {
debug('%s: error: %s', this._currentDc!.ipAddress, error.stack)
this.log.error('%s: error: %s', this._currentDc!.ipAddress, error.stack)
this.emit('error', error)
}
async handleConnect(): Promise<void> {
debug('%s: connected', this._currentDc!.ipAddress)
this.log.debug('%s: connected', this._currentDc!.ipAddress)
const initialMessage = await this._packetCodec.tag()
this._socket!.send(initialMessage)

View file

@ -1,6 +1,7 @@
import EventEmitter from 'events'
import { IPacketCodec } from './abstract'
import { ICryptoProvider } from '../../utils/crypto'
import { Logger } from '../../utils/logger'
export abstract class WrappedCodec extends EventEmitter {
protected _crypto!: ICryptoProvider
@ -20,8 +21,8 @@ export abstract class WrappedCodec extends EventEmitter {
return this
}
setupCrypto(crypto: ICryptoProvider): void {
setup(crypto: ICryptoProvider, log: Logger): void {
this._crypto = crypto
this._inner.setupCrypto?.(crypto)
this._inner.setup?.(crypto, log)
}
}

View file

@ -1,5 +1,6 @@
import { BasicPeerType, MaybeAsync } from '../types'
import { tl } from '@mtcute/tl'
import { Logger } from '../utils/logger'
export namespace ITelegramStorage {
export interface PeerInfo {
@ -31,6 +32,12 @@ export namespace ITelegramStorage {
* write updates to the disk when `save()` is called.
*/
export interface ITelegramStorage {
/**
* This method is called before any other.
* For storages that use logging, logger instance.
*/
setup?(log: Logger): void
/**
* Load session from some external storage.
* Should be used either to load session content from file/network/etc

View file

@ -0,0 +1,115 @@
import { _defaultLoggingHandler } from './platform/logging'
let defaultLogLevel = 2
if (typeof process !== 'undefined') {
const envLogLevel = parseInt(process.env.MTCUTE_LOG_LEVEL!)
if (!isNaN(envLogLevel)) {
defaultLogLevel = envLogLevel
}
} else if (typeof localStorage !== 'undefined') {
const localLogLevel = parseInt(localStorage.MTCUTE_LOG_LEVEL)
if (!isNaN(localLogLevel)) {
defaultLogLevel = localLogLevel
}
}
const FORMATTER_RE = /%[a-zA-Z]/g
/**
* Logger created by {@link LogManager}
*/
export class Logger {
private color: number
constructor(readonly mgr: LogManager, readonly tag: string) {
let hash = 0
for (let i = 0; i < tag.length; i++) {
hash = (hash << 5) - hash + tag.charCodeAt(i)
hash |= 0 // convert to 32bit int
}
this.color = Math.abs(hash) % 6
}
log(level: number, fmt: string, ...args: any[]): void {
if (level > this.mgr.level) return
// custom formatters
if (
fmt.indexOf('%h') > -1 ||
fmt.indexOf('%b') > -1 ||
fmt.indexOf('%j') > -1
) {
let idx = 0
fmt = fmt.replace(FORMATTER_RE, (m) => {
if (m === '%h' || m === '%b' || m === '%j') {
const val = args[idx]
args.splice(idx, 1)
if (m === '%h') return val.toString('hex')
if (m === '%b') return !!val + ''
if (m === '%j') return JSON.stringify(val)
}
idx++
return m
})
}
this.mgr.handler(this.color, level, this.tag, fmt, args)
}
readonly error = this.log.bind(this, LogManager.ERROR)
readonly warn = this.log.bind(this, LogManager.WARN)
readonly info = this.log.bind(this, LogManager.INFO)
readonly debug = this.log.bind(this, LogManager.DEBUG)
readonly request = this.log.bind(this, LogManager.REQUEST)
/**
* Create a {@link Logger} with the given tag
* from the same {@link LogManager} as the current
* Logger.
*
* @param tag Logger tag
*/
create(tag: string): Logger {
return this.mgr.create(tag)
}
}
/**
* Log manager.
*
* Does nothing by itself, but allows managing instance logs
*/
export class LogManager {
static OFF = 0
static ERROR = 1
static WARN = 2
static INFO = 3
static DEBUG = 4
static REQUEST = 5
private _cache: Record<string, Logger> = {}
level = defaultLogLevel
handler = _defaultLoggingHandler
disable(): void {
this.level = 0
}
/**
* Create a {@link Logger} with the given tag
*
* @param tag Logger tag
*/
create(tag: string): Logger {
if (!(tag in this._cache)) {
this._cache[tag] = new Logger(this, tag)
}
return this._cache[tag]
}
}

View file

@ -0,0 +1,57 @@
import { isatty } from 'tty'
const isTty = isatty(process.stdout.fd)
const BASE_FORMAT = isTty ? '[%s] [%s] %s%s\x1b[0m - ' : '[%s] [%s] %s - '
const LEVEL_NAMES = isTty
? [
'', // OFF
'\x1b[31mERR\x1b[0m',
'\x1b[33mWRN\x1b[0m',
'\x1b[34mINF\x1b[0m',
'\x1b[36mDBG\x1b[0m',
'\x1b[35mREQ\x1b[0m',
]
: [
'', // OFF
'ERR',
'WRN',
'INF',
'DBG',
'REQ',
]
const TAG_COLORS = [6, 2, 3, 4, 5, 1].map((i) => `\x1b[3${i};1m`)
/** @internal */
export const _defaultLoggingHandler = isTty
? (
color: number,
level: number,
tag: string,
fmt: string,
args: any[]
): void => {
console.log(
BASE_FORMAT + fmt,
new Date().toISOString(),
LEVEL_NAMES[level],
TAG_COLORS[color],
tag,
...args
)
}
: (
color: number,
level: number,
tag: string,
fmt: string,
args: any[]
): void => {
console.log(
BASE_FORMAT + fmt,
new Date().toISOString(),
LEVEL_NAMES[level],
tag,
...args
)
}

View file

@ -0,0 +1,46 @@
const BASE_FORMAT = '[%s] [%с%s%с] %c%s%c - '
const LEVEL_NAMES = [
'', // OFF
'ERR',
'WRN',
'INF',
'DBG',
'REQ'
]
const COLORS = [
'', // OFF
'#ff0000',
'#ffff00',
'#0000ff',
'#00ffff',
'#ff00ff',
]
const TAG_COLORS = [
'#44ffff',
'#44ff44',
'#ffff44',
'#4444ff',
'#ff44ff',
'#ff4444',
]
/** @internal */
export const _defaultLoggingHandler = (
color: number,
level: number,
tag: string,
fmt: string,
args: any[]
): void => {
console.log(
BASE_FORMAT + fmt,
new Date().toISOString(),
COLORS[level],
LEVEL_NAMES[level],
'',
TAG_COLORS[color],
tag,
'',
...args
)
}

View file

@ -15,7 +15,6 @@
"@mtcute/tl": "~1.131.0",
"@mtcute/core": "^1.0.0",
"@mtcute/client": "^1.0.0",
"events": "^3.2.0",
"debug": "^4.3.1"
"events": "^3.2.0"
}
}

View file

@ -7,8 +7,6 @@ import {
import { connect as connectTcp } from 'net'
import { connect as connectTls, SecureContextOptions } from 'tls'
const debug = require('debug')('mtcute:http-proxy')
/**
* An error has occurred while connecting to an HTTP(s) proxy
*/
@ -105,7 +103,7 @@ export abstract class BaseHttpProxyTcpTransport extends BaseTcpTransport {
}
private _onProxyConnected() {
debug(
this.log.debug(
'[%s:%d] connected to proxy, sending CONNECT',
this._proxy.host,
this._proxy.port
@ -135,7 +133,7 @@ export abstract class BaseHttpProxyTcpTransport extends BaseTcpTransport {
this._socket!.write(packet)
this._socket!.once('data', (msg) => {
debug(
this.log.debug(
'[%s:%d] CONNECT resulted in: %s',
this._proxy.host,
this._proxy.port,

View file

@ -12,7 +12,6 @@
"build": "tsc"
},
"dependencies": {
"@mtcute/core": "^1.0.0",
"debug": "^4.3.1"
"@mtcute/core": "^1.0.0"
}
}

View file

@ -9,8 +9,6 @@ import { connect } from 'net'
// @ts-ignore
import { normalize } from 'ip6'
const debug = require('debug')('mtcute:socks-proxy')
/**
* An error has occurred while connecting to an SOCKS proxy
*/
@ -234,7 +232,7 @@ export abstract class BaseSocksTcpTransport extends BaseTcpTransport {
}
const code = msg[1]
debug(
this.log.debug(
'[%s:%d] CONNECT returned code %d',
this._proxy.host,
this._proxy.port,
@ -259,7 +257,7 @@ export abstract class BaseSocksTcpTransport extends BaseTcpTransport {
}
}
debug(
this.log.debug(
'[%s:%d] connected to proxy, sending CONNECT',
this._proxy.host,
this._proxy.port
@ -280,7 +278,7 @@ export abstract class BaseSocksTcpTransport extends BaseTcpTransport {
let state: 'greeting' | 'auth' | 'connect' = 'greeting'
const sendConnect = () => {
debug(
this.log.debug(
'[%s:%d] sending CONNECT',
this._proxy.host,
this._proxy.port
@ -317,7 +315,7 @@ export abstract class BaseSocksTcpTransport extends BaseTcpTransport {
const chosen = msg[1]
debug(
this.log.debug(
'[%s:%d] GREETING returned auth method %d',
this._proxy.host,
this._proxy.port,
@ -386,7 +384,7 @@ export abstract class BaseSocksTcpTransport extends BaseTcpTransport {
return
}
debug(
this.log.debug(
'[%s:%d] AUTH returned code %d',
this._proxy.host,
this._proxy.port,
@ -421,7 +419,7 @@ export abstract class BaseSocksTcpTransport extends BaseTcpTransport {
const code = msg[1]
debug(
this.log.debug(
'[%s:%d] CONNECT returned code %d',
this._proxy.host,
this._proxy.port,
@ -451,7 +449,7 @@ export abstract class BaseSocksTcpTransport extends BaseTcpTransport {
}
}
debug(
this.log.debug(
'[%s:%d] connected to proxy, sending GREETING',
this._proxy.host,
this._proxy.port

View file

@ -13,7 +13,6 @@
},
"dependencies": {
"@mtcute/core": "^1.0.0",
"ip6": "^0.2.6",
"debug": "^4.3.1"
"ip6": "^0.2.6"
}
}

View file

@ -11,8 +11,7 @@ import { tl } from '@mtcute/tl'
import sqlite3 from 'better-sqlite3'
import bigInt from 'big-integer'
import { throttle } from '@mtcute/core'
const debug = require('debug')('mtcute:sqlite')
import { Logger } from '@mtcute/core/src/utils/logger'
function serializeAccessHash(hash: tl.Long): Buffer {
const arr = hash.toArray(256)
@ -181,6 +180,8 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
private _vacuumTimeout?: NodeJS.Timeout
private _vacuumInterval: number
private log!: Logger
/**
* @param filename Database file name, or `:memory:` for in-memory DB
* @param params
@ -302,6 +303,10 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
// todo: add support for workers (idk if really needed, but still)
}
setup(log: Logger): void {
this.log = log.create('sqlite')
}
private _readFullPeer(data: Buffer): tl.TypeUser | tl.TypeChat | null {
// reuse reader because why not
this._reader.pos = 0
@ -368,14 +373,14 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
// tables already exist, check version
this._initializeStatements()
const version = this._getFromKv('ver')
debug('current db version = %d', version)
this.log.debug('current db version = %d', version)
if (version < CURRENT_VERSION) {
this._upgradeDatabase(version)
this._setToKv('ver', CURRENT_VERSION, true)
}
} else {
// create tables
debug('creating tables')
this.log.debug('creating tables')
this._db.exec(SCHEMA)
this._initializeStatements()
this._setToKv('ver', CURRENT_VERSION, true)
@ -389,7 +394,7 @@ export class SqliteStorage implements ITelegramStorage /*, IStateStorage */ {
load(): void {
this._db = sqlite3(this._filename, {
verbose: debug.enabled ? debug : null,
verbose: this.log.mgr.level === 4 ? this.log.debug : undefined,
})
this._initialize()

View file

@ -15,8 +15,7 @@
"@mtcute/core": "^1.0.0",
"@mtcute/tl": "~1.131.0",
"better-sqlite3": "^7.4.0",
"big-integer": "1.6.48",
"debug": "^4.3.1"
"big-integer": "1.6.48"
},
"devDependencies": {
"@types/sqlite3": "^3.1.7",