const debug = require("debug")("streamroller:RollingFileWriteStream");
const fs = require("fs-extra");
const path = require("path");
const os = require("os");
const newNow = require("./now");
const format = require("date-format");
const { Writable } = require("stream");
const fileNameFormatter = require("./fileNameFormatter");
const fileNameParser = require("./fileNameParser");
const moveAndMaybeCompressFile = require("./moveAndMaybeCompressFile");
const deleteFiles = fileNames => {
debug(`deleteFiles: files to delete: ${fileNames}`);
return Promise.all(fileNames.map(f => fs.unlink(f).catch((e) => {
debug(`deleteFiles: error when unlinking ${f}, ignoring. Error was ${e}`);
})));
};
/**
* RollingFileWriteStream is mainly used when writing to a file rolling by date or size.
* RollingFileWriteStream inherits from stream.Writable
*/
class RollingFileWriteStream extends Writable {
/**
* Create a RollingFileWriteStream
* @constructor
* @param {string} filePath - The file path to write.
* @param {object} options - The extra options
* @param {number} options.numToKeep - The max numbers of files to keep.
* @param {number} options.maxSize - The maxSize one file can reach. Unit is Byte.
* This should be more than 1024. The default is 0.
* If not specified or 0, then no log rolling will happen.
* @param {string} options.mode - The mode of the files. The default is '0600'. Refer to stream.writable for more.
* @param {string} options.flags - The default is 'a'. Refer to stream.flags for more.
* @param {boolean} options.compress - Whether to compress backup files.
* @param {boolean} options.keepFileExt - Whether to keep the file extension.
* @param {string} options.pattern - The date string pattern in the file name.
* @param {boolean} options.alwaysIncludePattern - Whether to add date to the name of the first file.
*/
constructor(filePath, options) {
debug(`constructor: creating RollingFileWriteStream. path=${filePath}`);
if (typeof filePath !== "string" || filePath.length === 0) {
throw new Error(`Invalid filename: ${filePath}`);
} else if (filePath.endsWith(path.sep)) {
throw new Error(`Filename is a directory: ${filePath}`);
} else if (filePath.indexOf(`~${path.sep}`) === 0) {
// handle ~ expansion: https://github.com/nodejs/node/issues/684
// exclude ~ and ~filename as these can be valid files
filePath = filePath.replace("~", os.homedir());
}
super(options);
this.options = this._parseOption(options);
this.fileObject = path.parse(filePath);
if (this.fileObject.dir === "") {
this.fileObject = path.parse(path.join(process.cwd(), filePath));
}
this.fileFormatter = fileNameFormatter({
file: this.fileObject,
alwaysIncludeDate: this.options.alwaysIncludePattern,
needsIndex: this.options.maxSize < Number.MAX_SAFE_INTEGER,
compress: this.options.compress,
keepFileExt: this.options.keepFileExt,
fileNameSep: this.options.fileNameSep
});
this.fileNameParser = fileNameParser({
file: this.fileObject,
keepFileExt: this.options.keepFileExt,
pattern: this.options.pattern,
fileNameSep: this.options.fileNameSep
});
this.state = {
currentSize: 0
};
if (this.options.pattern) {
this.state.currentDate = format(this.options.pattern, newNow());
}
this.filename = this.fileFormatter({
index: 0,
date: this.state.currentDate
});
if (["a", "a+", "as", "as+"].includes(this.options.flags)) {
this._setExistingSizeAndDate();
}
debug(
`constructor: create new file ${this.filename}, state=${JSON.stringify(
this.state
)}`
);
this._renewWriteStream();
}
_setExistingSizeAndDate() {
try {
const stats = fs.statSync(this.filename);
this.state.currentSize = stats.size;
if (this.options.pattern) {
this.state.currentDate = format(this.options.pattern, stats.mtime);
}
} catch (e) {
//file does not exist, that's fine - move along
return;
}
}
_parseOption(rawOptions) {
const defaultOptions = {
maxSize: 0,
numToKeep: Number.MAX_SAFE_INTEGER,
encoding: "utf8",
mode: parseInt("0600", 8),
flags: "a",
compress: false,
keepFileExt: false,
alwaysIncludePattern: false
};
const options = Object.assign({}, defaultOptions, rawOptions);
if (!options.maxSize) {
delete options.maxSize;
} else if (options.maxSize <= 0) {
throw new Error(`options.maxSize (${options.maxSize}) should be > 0`);
}
// options.numBackups will supercede options.numToKeep
if (options.numBackups || options.numBackups === 0) {
if (options.numBackups < 0) {
throw new Error(`options.numBackups (${options.numBackups}) should be >= 0`);
} else if (options.numBackups >= Number.MAX_SAFE_INTEGER) {
// to cater for numToKeep (include the hot file) at Number.MAX_SAFE_INTEGER
throw new Error(`options.numBackups (${options.numBackups}) should be < Number.MAX_SAFE_INTEGER`);
} else {
options.numToKeep = options.numBackups + 1;
}
} else if (options.numToKeep <= 0) {
throw new Error(`options.numToKeep (${options.numToKeep}) should be > 0`);
}
debug(
`_parseOption: creating stream with option=${JSON.stringify(options)}`
);
return options;
}
_final(callback) {
this.currentFileStream.end("", this.options.encoding, callback);
}
_write(chunk, encoding, callback) {
this._shouldRoll().then(() => {
debug(
`_write: writing chunk. ` +
`file=${this.currentFileStream.path} ` +
`state=${JSON.stringify(this.state)} ` +
`chunk=${chunk}`
);
this.currentFileStream.write(chunk, encoding, e => {
this.state.currentSize += chunk.length;
callback(e);
});
});
}
async _shouldRoll() {
if (this._dateChanged() || this._tooBig()) {
debug(
`_shouldRoll: rolling because dateChanged? ${this._dateChanged()} or tooBig? ${this._tooBig()}`
);
await this._roll();
}
}
_dateChanged() {
return (
this.state.currentDate &&
this.state.currentDate !== format(this.options.pattern, newNow())
);
}
_tooBig() {
return this.state.currentSize >= this.options.maxSize;
}
_roll() {
debug(`_roll: closing the current stream`);
return new Promise((resolve, reject) => {
this.currentFileStream.end("", this.options.encoding, () => {
this._moveOldFiles()
.then(resolve)
.catch(reject);
});
});
}
async _moveOldFiles() {
const files = await this._getExistingFiles();
const todaysFiles = this.state.currentDate
? files.filter(f => f.date === this.state.currentDate)
: files;
for (let i = todaysFiles.length; i >= 0; i--) {
debug(`_moveOldFiles: i = ${i}`);
const sourceFilePath = this.fileFormatter({
date: this.state.currentDate,
index: i
});
const targetFilePath = this.fileFormatter({
date: this.state.currentDate,
index: i + 1
});
const moveAndCompressOptions = {
compress: this.options.compress && i === 0,
mode: this.options.mode
};
await moveAndMaybeCompressFile(
sourceFilePath,
targetFilePath,
moveAndCompressOptions
);
}
this.state.currentSize = 0;
this.state.currentDate = this.state.currentDate
? format(this.options.pattern, newNow())
: null;
debug(
`_moveOldFiles: finished rolling files. state=${JSON.stringify(
this.state
)}`
);
this._renewWriteStream();
// wait for the file to be open before cleaning up old ones,
// otherwise the daysToKeep calculations can be off
await new Promise((resolve, reject) => {
this.currentFileStream.write("", "utf8", () => {
this._clean()
.then(resolve)
.catch(reject);
});
});
}
// Sorted from the oldest to the latest
async _getExistingFiles() {
const files = await fs.readdir(this.fileObject.dir)
.catch( /* istanbul ignore next: will not happen on windows */ () => []);
debug(`_getExistingFiles: files=${files}`);
const existingFileDetails = files
.map(n => this.fileNameParser(n))
.filter(n => n);
const getKey = n =>
(n.timestamp ? n.timestamp : newNow().getTime()) - n.index;
existingFileDetails.sort((a, b) => getKey(a) - getKey(b));
return existingFileDetails;
}
_renewWriteStream() {
const filePath = this.fileFormatter({
date: this.state.currentDate,
index: 0
});
// attempt to create the directory
const mkdir = (dir) => {
try {
return fs.mkdirSync(dir, { recursive: true });
}
// backward-compatible fs.mkdirSync for nodejs pre-10.12.0 (without recursive option)
catch (e) {
// recursive creation of parent first
if (e.code === "ENOENT") {
mkdir(path.dirname(dir));
return mkdir(dir);
}
// throw error for all except EEXIST and EROFS (read-only filesystem)
if (e.code !== "EEXIST" && e.code !== "EROFS") {
throw e;
}
// EEXIST: throw if file and not directory
// EROFS : throw if directory not found
else {
try {
if (fs.statSync(dir).isDirectory()) {
return dir;
}
throw e;
} catch (err) {
throw e;
}
}
}
};
mkdir(this.fileObject.dir);
const ops = {
flags: this.options.flags,
encoding: this.options.encoding,
mode: this.options.mode
};
const renameKey = function(obj, oldKey, newKey) {
obj[newKey] = obj[oldKey];
delete obj[oldKey];
return obj;
};
// try to throw EISDIR, EROFS, EACCES
fs.appendFileSync(filePath, "", renameKey({ ...ops }, "flags", "flag"));
this.currentFileStream = fs.createWriteStream(filePath, ops);
this.currentFileStream.on("error", e => {
this.emit("error", e);
});
}
async _clean() {
const existingFileDetails = await this._getExistingFiles();
debug(
`_clean: numToKeep = ${this.options.numToKeep}, existingFiles = ${existingFileDetails.length}`
);
debug("_clean: existing files are: ", existingFileDetails);
if (this._tooManyFiles(existingFileDetails.length)) {
const fileNamesToRemove = existingFileDetails
.slice(0, existingFileDetails.length - this.options.numToKeep)
.map(f => path.format({ dir: this.fileObject.dir, base: f.filename }));
await deleteFiles(fileNamesToRemove);
}
}
_tooManyFiles(numFiles) {
return this.options.numToKeep > 0 && numFiles > this.options.numToKeep;
}
}
module.exports = RollingFileWriteStream;