Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | import type { StreamTransportOptions, TransportInstanceLog } from './type' import { isStream } from '@https-enable/utils' import { MESSAGE } from '../triple-beam' import { Transport } from './base' export class StreamTransport extends Transport<StreamTransportOptions> { stream: NodeJS.WritableStream isObjectMode: boolean constructor(options: StreamTransportOptions = {}) { super(options) if (!options.stream || !isStream(options.stream)) { throw new Error('options.stream is required.') } this.name = options.name || 'console' // We need to listen for drain events when write() returns false. This can // make node mad at times. this.stream = options.stream this.isObjectMode = options.stream._writableState?.objectMode || false // 确保不会触发 EventEmitter 内存泄漏警告 this.stream.setMaxListeners(Infinity) } log: TransportInstanceLog = (info, callback, options, force) => { if (!force && !this.shouldLog(info.level)) return callback?.() const handleComplete = (error?: Error | null) => { if (error) return callback?.(error) callback?.() } try { info = this.selfFormat(options) || info let payload: any if (this.isObjectMode) payload = { ...info } else payload = `${info[MESSAGE]}${this.eol}` // 处理背压和异步写入 const canWrite = this.stream.write(payload, (error) => { handleComplete(error || undefined) }) if (!canWrite) { this.stream.once('drain', () => handleComplete()) } } catch (error) { handleComplete(error instanceof Error ? error : new Error(String(error))) } } } |