Exit Full View

Games Cupboard / build / js / node_modules / log4js / lib / appenders / multiprocess.js

const debug = require('debug')('log4js:multiprocess');
const net = require('net');
const LoggingEvent = require('../LoggingEvent');

const END_MSG = '__LOG4JS__';

/**
 * Creates a server, listening on config.loggerPort, config.loggerHost.
 * Output goes to config.actualAppender (config.appender is used to
 * set up that appender).
 */
function logServer(config, actualAppender, levels) {
  /**
   * Takes a utf-8 string, returns an object with
   * the correct log properties.
   */
  function deserializeLoggingEvent(clientSocket, msg) {
    debug('(master) deserialising log event');
    const loggingEvent = LoggingEvent.deserialise(msg);
    loggingEvent.remoteAddress = clientSocket.remoteAddress;
    loggingEvent.remotePort = clientSocket.remotePort;

    return loggingEvent;
  }

  const server = net.createServer((clientSocket) => {
    debug('(master) connection received');
    clientSocket.setEncoding('utf8');
    let logMessage = '';

    function logTheMessage(msg) {
      debug('(master) deserialising log event and sending to actual appender');
      actualAppender(deserializeLoggingEvent(clientSocket, msg));
    }

    function chunkReceived(chunk) {
      debug('(master) chunk of data received');
      let event;
      logMessage += chunk || '';
      if (logMessage.indexOf(END_MSG) > -1) {
        event = logMessage.slice(0, logMessage.indexOf(END_MSG));
        logTheMessage(event);
        logMessage = logMessage.slice(event.length + END_MSG.length) || '';
        // check for more, maybe it was a big chunk
        chunkReceived();
      }
    }

    function handleError(error) {
      const loggingEvent = {
        startTime: new Date(),
        categoryName: 'log4js',
        level: levels.ERROR,
        data: ['A worker log process hung up unexpectedly', error],
        remoteAddress: clientSocket.remoteAddress,
        remotePort: clientSocket.remotePort,
      };
      actualAppender(loggingEvent);
    }

    clientSocket.on('data', chunkReceived);
    clientSocket.on('end', chunkReceived);
    clientSocket.on('error', handleError);
  });

  server.listen(
    config.loggerPort || 5000,
    config.loggerHost || 'localhost',
    (e) => {
      debug('(master) master server listening, error was ', e);
      // allow the process to exit, if this is the only socket active
      server.unref();
    }
  );

  function app(event) {
    debug('(master) log event sent directly to actual appender (local event)');
    return actualAppender(event);
  }

  app.shutdown = function (cb) {
    debug('(master) master shutdown called, closing server');
    server.close(cb);
  };

  return app;
}

function workerAppender(config) {
  let canWrite = false;
  const buffer = [];
  let socket;
  let shutdownAttempts = 3;

  function write(loggingEvent) {
    debug('(worker) Writing log event to socket');
    socket.write(loggingEvent.serialise(), 'utf8');
    socket.write(END_MSG, 'utf8');
  }

  function emptyBuffer() {
    let evt;
    debug('(worker) emptying worker buffer');
    while ((evt = buffer.shift())) {
      write(evt);
    }
  }

  function createSocket() {
    debug(
      `(worker) worker appender creating socket to ${
        config.loggerHost || 'localhost'
      }:${config.loggerPort || 5000}`
    );
    socket = net.createConnection(
      config.loggerPort || 5000,
      config.loggerHost || 'localhost'
    );
    socket.on('connect', () => {
      debug('(worker) worker socket connected');
      emptyBuffer();
      canWrite = true;
    });
    socket.on('timeout', socket.end.bind(socket));
    socket.on('error', (e) => {
      debug('connection error', e);
      canWrite = false;
      emptyBuffer();
    });
    socket.on('close', createSocket);
  }

  createSocket();

  function log(loggingEvent) {
    if (canWrite) {
      write(loggingEvent);
    } else {
      debug(
        '(worker) worker buffering log event because it cannot write at the moment'
      );
      buffer.push(loggingEvent);
    }
  }
  log.shutdown = function (cb) {
    debug('(worker) worker shutdown called');
    if (buffer.length && shutdownAttempts) {
      debug('(worker) worker buffer has items, waiting 100ms to empty');
      shutdownAttempts -= 1;
      setTimeout(() => {
        log.shutdown(cb);
      }, 100);
    } else {
      socket.removeAllListeners('close');
      socket.end(cb);
    }
  };
  return log;
}

function createAppender(config, appender, levels) {
  if (config.mode === 'master') {
    debug('Creating master appender');
    return logServer(config, appender, levels);
  }

  debug('Creating worker appender');
  return workerAppender(config);
}

function configure(config, layouts, findAppender, levels) {
  let appender;
  debug(`configure with mode = ${config.mode}`);
  if (config.mode === 'master') {
    if (!config.appender) {
      debug(`no appender found in config ${config}`);
      throw new Error('multiprocess master must have an "appender" defined');
    }
    debug(`actual appender is ${config.appender}`);
    appender = findAppender(config.appender);
    if (!appender) {
      debug(`actual appender "${config.appender}" not found`);
      throw new Error(
        `multiprocess master appender "${config.appender}" not defined`
      );
    }
  }
  return createAppender(config, appender, levels);
}

module.exports.configure = configure;