All files / packages/logger/src/transports stream.ts

0% Statements 0/42
100% Branches 1/1
100% Functions 1/1
0% Lines 0/42

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62                                                                                                                           
import type { StreamTransportOptions, TransportInstanceLog } from './type'
import { isStream } from '@https-enable/utils'
import { MESSAGE } from '../triple-beam'
import { Transport } from './base'
 
export class StreamTransport extends Transport<StreamTransportOptions> {
  stream: NodeJS.WritableStream
  isObjectMode: boolean
 
  constructor(options: StreamTransportOptions = {}) {
    super(options)
 
    if (!options.stream || !isStream(options.stream)) {
      throw new Error('options.stream is required.')
    }
 
    this.name = options.name || 'console'
 
    // We need to listen for drain events when write() returns false. This can
    // make node mad at times.
    this.stream = options.stream
    this.isObjectMode = options.stream._writableState?.objectMode || false
 
    // 确保不会触发 EventEmitter 内存泄漏警告
    this.stream.setMaxListeners(Infinity)
  }
 
  log: TransportInstanceLog = (info, callback, options, force) => {
    if (!force && !this.shouldLog(info.level))
      return callback?.()
 
    const handleComplete = (error?: Error | null) => {
      if (error)
        return callback?.(error)
 
      callback?.()
    }
 
    try {
      info = this.selfFormat(options) || info
 
      let payload: any
      if (this.isObjectMode)
        payload = { ...info }
      else
        payload = `${info[MESSAGE]}${this.eol}`
 
      // 处理背压和异步写入
      const canWrite = this.stream.write(payload, (error) => {
        handleComplete(error || undefined)
      })
 
      if (!canWrite) {
        this.stream.once('drain', () => handleComplete())
      }
    }
    catch (error) {
      handleComplete(error instanceof Error ? error : new Error(String(error)))
    }
  }
}