我正在尝试一次读取一个大文件。我找到了处理该主题的 a question on Quora,但我缺少一些联系以使整个事情结合在一起。
var Lazy=require("lazy");
new Lazy(process.stdin)
.lines
.forEach(
function(line) {
console.log(line.toString());
}
);
process.stdin.resume();
我想弄清楚的一点是,我如何一次从文件中读取一行,而不是像本示例中那样从 STDIN 读取。
我试过了:
fs.open('./VeryBigFile.csv', 'r', '0666', Process);
function Process(err, fd) {
if (err) throw err;
// DO lazy read
}
但它不起作用。我知道在紧要关头我可以回退到使用 PHP 之类的东西,但我想弄清楚这一点。
我不认为其他答案会起作用,因为该文件比我运行它的服务器大得多。
fs.readSync()
是相当困难的。您可以将二进制八位字节读入缓冲区,但在将缓冲区转换为 JavaScript 字符串并扫描 EOL 之前,如果不检查缓冲区,则无法轻松处理部分 UTF-8 或 UTF-16 字符。 Buffer()
类型没有像原生字符串那样丰富的函数集来对其实例进行操作,但原生字符串不能包含二进制数据。在我看来,缺乏从任意文件句柄读取文本行的内置方法是 node.js 中的一个真正差距。
if (line.length==1 && line[0] == 48) special(line);
node
的 API 文档 github.com/nodejs/node/pull/4609 中稍作修改
从 Node.js v0.12 和 Node.js v4.0.0 开始,有一个稳定的 readline 核心模块。这是从文件中读取行的最简单方法,无需任何外部模块:
const fs = require('fs');
const readline = require('readline');
async function processLineByLine() {
const fileStream = fs.createReadStream('input.txt');
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
// Note: we use the crlfDelay option to recognize all instances of CR LF
// ('\r\n') in input.txt as a single line break.
for await (const line of rl) {
// Each line in input.txt will be successively available here as `line`.
console.log(`Line from file: ${line}`);
}
}
processLineByLine();
或者:
var lineReader = require('readline').createInterface({
input: require('fs').createReadStream('file.in')
});
lineReader.on('line', function (line) {
console.log('Line from file:', line);
});
最后一行被正确读取(从 Node v0.12 或更高版本开始),即使没有最终的 \n
。
UPDATE:此示例为 added to Node's API official documentation。
对于这样一个简单的操作,不应该对第三方模块有任何依赖。放轻松。
var fs = require('fs'),
readline = require('readline');
var rd = readline.createInterface({
input: fs.createReadStream('/path/to/file'),
output: process.stdout,
console: false
});
rd.on('line', function(line) {
console.log(line);
});
line
事件仅在点击 \n
之后出现,即,所有备选方案都被遗漏了(参见 unicode.org/reports/tr18/#Line_Boundaries)。 #2,最后一个 \n
之后的数据被静默忽略(参见 stackoverflow.com/questions/18450197/…)。我会将此解决方案称为危险,因为它适用于 99% 的所有文件和 99% 的数据,但对于其余部分静默失败。每当您执行 fs.writeFileSync( path, lines.join('\n'))
时,您已经编写了一个只能被上述解决方案部分读取的文件。
readline
包的行为方式确实很奇怪。
rd.on("close", ..);
可用作回调(读取所有行时发生)
您不必open
文件,而是必须创建一个ReadStream
。
然后将该流传递给 Lazy
new lazy(fs.createReadStream('...')).lines.forEach(function(l) { /* ... */ }).join(function() { /* Done */ })
new lazy(...).lines.forEach(...).on('end', function() {...})
.on('end'...
.forEach(...)
,而事实上,当我绑定事件时,一切都按预期运行 首先。
2019年更新
一个很棒的例子已经发布在官方的 Nodejs 文档上。 here
这需要在您的机器上安装最新的 Nodejs。 >11.4
const fs = require('fs');
const readline = require('readline');
async function processLineByLine() {
const fileStream = fs.createReadStream('input.txt');
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
// Note: we use the crlfDelay option to recognize all instances of CR LF
// ('\r\n') in input.txt as a single line break.
for await (const line of rl) {
// Each line in input.txt will be successively available here as `line`.
console.log(`Line from file: ${line}`);
}
}
processLineByLine();
createInterface()
调用和 for await
循环开始之间有任何 await
,那么您将神秘地从文件。 createInterface()
立即开始在幕后发射行,而使用 const line of rl
隐式创建的异步迭代器在创建之前无法开始侦听这些行。
require('fs').readFileSync('file.txt', 'utf-8').split(/\r?\n/).forEach(function(line){
console.log(line);
})
有一个非常好的模块可以逐行读取文件,它叫做 line-reader
有了它,你只需写:
var lineReader = require('line-reader');
lineReader.eachLine('file.txt', function(line, last) {
console.log(line);
// do whatever you want with line...
if(last){
// or check if it's the last one
}
});
如果您需要更多控制,您甚至可以使用“java-style”接口迭代文件:
lineReader.open('file.txt', function(reader) {
if (reader.hasNextLine()) {
reader.nextLine(function(line) {
console.log(line);
});
}
});
process/stdin
)。至少,如果可以的话,阅读代码并尝试它肯定不是很明显。
readline
core module 从文件中读取行的内置方法。
function(reader)
和 function(line)
应该是:function(err,reader)
和 function(err,line)
。
line-reader
异步读取文件。它的同步替代方案是 line-reader-sync
老话题,但这有效:
var rl = readline.createInterface({
input : fs.createReadStream('/path/file.txt'),
output: process.stdout,
terminal: false
})
rl.on('line',function(line){
console.log(line) //or parse line
})
简单的。无需外部模块。
readline is not defined
或 fs is not defined
,请添加 var readline = require('readline');
和 var fs = require('fs');
以使其正常工作。否则甜蜜,甜蜜的代码。谢谢。
您可以随时使用自己的线路阅读器。我还没有对这个片段进行基准测试,但它正确地将传入的块流拆分为没有尾随 '\n' 的行
var last = "";
process.stdin.on('data', function(chunk) {
var lines, i;
lines = (last+chunk).split("\n");
for(i = 0; i < lines.length - 1; i++) {
console.log("line: " + lines[i]);
}
last = lines[i];
});
process.stdin.on('end', function() {
console.log("line: " + last);
});
process.stdin.resume();
在处理需要在日志解析期间累积数据的快速日志解析脚本时,我确实想到了这一点,我觉得尝试使用 js 和 node 而不是使用 perl 或 bash 会很好。
无论如何,我确实觉得小的 nodejs 脚本应该是自包含的,而不是依赖第三方模块,所以在阅读了这个问题的所有答案之后,每个使用各种模块来处理行解析,一个 13 SLOC 本机 nodejs 解决方案可能是有趣的。
stdin
之外,似乎没有任何简单的方法可以将其扩展为使用任意文件......除非我错过了一些东西。
fs.createReadStream('./myBigFile.csv')
创建一个 ReadStream
并使用它来代替 stdin
readline
core module 从文件中读取行的内置方法。
使用 carrier module:
var carrier = require('carrier');
process.stdin.resume();
carrier.carry(process.stdin, function(line) {
console.log('got one line: ' + line);
});
var inStream = fs.createReadStream('input.txt', {flags:'r'});
但您的语法比使用 .on() 的记录方法更简洁:carrier.carry(inStream).on('line', function(line) { ...
\r\n
和 \n
行结尾。如果您需要处理 OS X 之前的 MacOS 风格的测试文件,它们使用 \r
而运营商不处理这个问题。令人惊讶的是,仍然有这样的文件在野外漂浮。您可能还需要显式处理 Unicode BOM(字节顺序标记),这用于 MS Windows 影响范围内文本文件的开头。
readline
core module 从文件中读取行的内置方法。
由于节点中的排水/暂停/恢复的工作方式,我在尝试处理这些行并将它们写入另一个流时,使用 Lazy 逐行读取最终导致大量内存泄漏(请参阅:http://elegantcode.com/2011/04/06/taking-baby-steps-with-node-js-pumping-data-between-streams/(我顺便说一句,爱这个人))。我还没有仔细研究 Lazy 以了解确切原因,但我无法暂停我的读取流以允许在没有 Lazy 退出的情况下进行排水。
我编写了将大量 csv 文件处理为 xml 文档的代码,您可以在此处查看代码:https://github.com/j03m/node-csv2xml
如果您使用 Lazy line 运行以前的修订版,它会泄漏。最新版本根本没有泄漏,您可能可以将其用作阅读器/处理器的基础。虽然我有一些定制的东西在那里。
编辑:我想我还应该注意,我的 Lazy 代码运行良好,直到我发现自己编写了足够大的 xml 片段,这些片段因为必要而耗尽/暂停/恢复。对于较小的块,这很好。
readline
core module 可以更简单地从文件中读取行。
编辑:
使用 transform stream。
使用 BufferedReader,您可以读取行。
new BufferedReader ("lorem ipsum", { encoding: "utf8" })
.on ("error", function (error){
console.log ("error: " + error);
})
.on ("line", function (line){
console.log ("line: " + line);
})
.on ("end", function (){
console.log ("EOF");
})
.read ();
readline
core module 可以更简单地从文件中读取行。
在大多数情况下,这应该足够了:
const fs = require("fs")
fs.readFile('./file', 'utf-8', (err, file) => {
const lines = file.split('\n')
for (let line of lines)
console.log(line)
});
自从发布我的原始答案以来,我发现 split 是一个非常易于使用的节点模块,用于在文件中读取行;它也接受可选参数。
var split = require('split');
fs.createReadStream(file)
.pipe(split())
.on('data', function (line) {
//each chunk now is a seperate line!
});
尚未对非常大的文件进行测试。如果您这样做,请告诉我们。
function createLineReader(fileName){
var EM = require("events").EventEmitter
var ev = new EM()
var stream = require("fs").createReadStream(fileName)
var remainder = null;
stream.on("data",function(data){
if(remainder != null){//append newly received data chunk
var tmp = new Buffer(remainder.length+data.length)
remainder.copy(tmp)
data.copy(tmp,remainder.length)
data = tmp;
}
var start = 0;
for(var i=0; i<data.length; i++){
if(data[i] == 10){ //\n new line
var line = data.slice(start,i)
ev.emit("line", line)
start = i+1;
}
}
if(start<data.length){
remainder = data.slice(start);
}else{
remainder = null;
}
})
stream.on("end",function(){
if(null!=remainder) ev.emit("line",remainder)
})
return ev
}
//---------main---------------
fileName = process.argv[2]
lineReader = createLineReader(fileName)
lineReader.on("line",function(line){
console.log(line.toString())
//console.log("++++++++++++++++++++")
})
stream.on("data")
的调用中的 data
是否可能仅以多字节 UTF-8 字符的一部分开始或结束,例如 ა
,即 U+10D0
,由三个字节组成 { 5} 83
90
readline
core module 可以更简单地从文件中读取行。
我想解决同样的问题,基本上 Perl 中的内容是:
while (<>) {
process_line($_);
}
我的用例只是一个独立的脚本,而不是服务器,所以同步很好。这些是我的标准:
可以在许多项目中重用的最小同步代码。
文件大小或行数没有限制。
对行的长度没有限制。
能够处理 UTF-8 中的完整 Unicode,包括 BMP 之外的字符。
能够处理 *nix 和 Windows 行尾(我不需要旧式 Mac)。
要包含在行中的行尾字符。
能够处理带有或不带有行尾字符的最后一行。
不要使用任何未包含在 node.js 发行版中的外部库。
这是一个让我了解 node.js 中的低级脚本类型代码并确定它作为 Perl 等其他脚本语言替代品的可行性的项目。
经过惊人的努力和几次错误的开始,这就是我想出的代码。它非常快,但没有我预期的那么简单:(fork it on GitHub)
var fs = require('fs'),
StringDecoder = require('string_decoder').StringDecoder,
util = require('util');
function lineByLine(fd) {
var blob = '';
var blobStart = 0;
var blobEnd = 0;
var decoder = new StringDecoder('utf8');
var CHUNK_SIZE = 16384;
var chunk = new Buffer(CHUNK_SIZE);
var eolPos = -1;
var lastChunk = false;
var moreLines = true;
var readMore = true;
// each line
while (moreLines) {
readMore = true;
// append more chunks from the file onto the end of our blob of text until we have an EOL or EOF
while (readMore) {
// do we have a whole line? (with LF)
eolPos = blob.indexOf('\n', blobStart);
if (eolPos !== -1) {
blobEnd = eolPos;
readMore = false;
// do we have the last line? (no LF)
} else if (lastChunk) {
blobEnd = blob.length;
readMore = false;
// otherwise read more
} else {
var bytesRead = fs.readSync(fd, chunk, 0, CHUNK_SIZE, null);
lastChunk = bytesRead !== CHUNK_SIZE;
blob += decoder.write(chunk.slice(0, bytesRead));
}
}
if (blobStart < blob.length) {
processLine(blob.substring(blobStart, blobEnd + 1));
blobStart = blobEnd + 1;
if (blobStart >= CHUNK_SIZE) {
// blobStart is in characters, CHUNK_SIZE is in octets
var freeable = blobStart / CHUNK_SIZE;
// keep blob from growing indefinitely, not as deterministic as I'd like
blob = blob.substring(CHUNK_SIZE);
blobStart -= CHUNK_SIZE;
blobEnd -= CHUNK_SIZE;
}
} else {
moreLines = false;
}
}
}
它可能可以进一步清理,这是反复试验的结果。
基于生成器的行阅读器:https://github.com/neurosnap/gen-readlines
var fs = require('fs');
var readlines = require('gen-readlines');
fs.open('./file.txt', 'r', function(err, fd) {
if (err) throw err;
fs.fstat(fd, function(err, stats) {
if (err) throw err;
for (var line of readlines(fd, stats.size)) {
console.log(line.toString());
}
});
});
如果您想逐行读取文件并将其写入另一个文件:
var fs = require('fs');
var readline = require('readline');
var Stream = require('stream');
function readFileLineByLine(inputFile, outputFile) {
var instream = fs.createReadStream(inputFile);
var outstream = new Stream();
outstream.readable = true;
outstream.writable = true;
var rl = readline.createInterface({
input: instream,
output: outstream,
terminal: false
});
rl.on('line', function (line) {
fs.appendFileSync(outputFile, line + '\n');
});
};
var fs = require('fs');
function readfile(name,online,onend,encoding) {
var bufsize = 1024;
var buffer = new Buffer(bufsize);
var bufread = 0;
var fd = fs.openSync(name,'r');
var position = 0;
var eof = false;
var data = "";
var lines = 0;
encoding = encoding || "utf8";
function readbuf() {
bufread = fs.readSync(fd,buffer,0,bufsize,position);
position += bufread;
eof = bufread ? false : true;
data += buffer.toString(encoding,0,bufread);
}
function getLine() {
var nl = data.indexOf("\r"), hasnl = nl !== -1;
if (!hasnl && eof) return fs.closeSync(fd), online(data,++lines), onend(lines);
if (!hasnl && !eof) readbuf(), nl = data.indexOf("\r"), hasnl = nl !== -1;
if (!hasnl) return process.nextTick(getLine);
var line = data.substr(0,nl);
data = data.substr(nl+1);
if (data[0] === "\n") data = data.substr(1);
online(line,++lines);
process.nextTick(getLine);
}
getLine();
}
我遇到了同样的问题,并提出了上述解决方案,对其他人来说看起来很相似,但它是 aSync 并且可以非常快速地读取大文件
希望这会有所帮助
在进行此类操作时,我们必须问自己两个问题:
用于执行它的内存量是多少?内存消耗是否随着文件大小而急剧增加?
require('fs').readFileSync()
等解决方案将整个文件加载到内存中。这意味着执行操作所需的内存量几乎等于文件大小。对于大于 50mbs
的任何东西,我们都应该避免使用这些
我们可以通过在函数调用之后放置这些代码行来轻松跟踪函数使用的内存量:
const used = process.memoryUsage().heapUsed / 1024 / 1024;
console.log(
`The script uses approximately ${Math.round(used * 100) / 100} MB`
);
现在,从大文件中读取特定行的最佳方法是使用节点的 readline。该文档有惊人的 examples。
我有一个小模块,它做得很好,并被很多其他项目使用npm readline请注意,在节点 v10 中有一个本机 readline 模块,所以我将我的模块重新发布为 linebyline https://www.npmjs.com/package/linebyline
如果您不想使用该模块,该功能非常简单:
var fs = require('fs'),
EventEmitter = require('events').EventEmitter,
util = require('util'),
newlines = [
13, // \r
10 // \n
];
var readLine = module.exports = function(file, opts) {
if (!(this instanceof readLine)) return new readLine(file);
EventEmitter.call(this);
opts = opts || {};
var self = this,
line = [],
lineCount = 0,
emit = function(line, count) {
self.emit('line', new Buffer(line).toString(), count);
};
this.input = fs.createReadStream(file);
this.input.on('open', function(fd) {
self.emit('open', fd);
})
.on('data', function(data) {
for (var i = 0; i < data.length; i++) {
if (0 <= newlines.indexOf(data[i])) { // Newline char was found.
lineCount++;
if (line.length) emit(line, lineCount);
line = []; // Empty buffer.
} else {
line.push(data[i]); // Buffer new line data.
}
}
}).on('error', function(err) {
self.emit('error', err);
}).on('end', function() {
// Emit last line if anything left over since EOF won't trigger it.
if (line.length){
lineCount++;
emit(line, lineCount);
}
self.emit('end');
}).on('close', function() {
self.emit('close');
});
};
util.inherits(readLine, EventEmitter);
另一种解决方案是通过顺序执行器 nsynjs 运行逻辑。它使用 node readline 模块逐行读取文件,并且不使用承诺或递归,因此不会在大文件上失败。代码如下所示:
var nsynjs = require('nsynjs');
var textFile = require('./wrappers/nodeReadline').textFile; // this file is part of nsynjs
function process(textFile) {
var fh = new textFile();
fh.open('path/to/file');
var s;
while (typeof(s = fh.readLine(nsynjsCtx).data) != 'undefined')
console.log(s);
fh.close();
}
var ctx = nsynjs.run(process,{},textFile,function () {
console.log('done');
});
上面的代码基于此示例:https://github.com/amaksr/nsynjs/blob/master/examples/node-readline/index.js
这是我最喜欢的文件浏览方式,一种简单的本地解决方案,用于使用现代 async/await
读取的渐进式(而不是“slurp”或全内存方式)文件。这是我在处理大型文本文件时发现“自然”的解决方案,而无需求助于 readline
包或任何非核心依赖项。
let buf = '';
for await ( const chunk of fs.createReadStream('myfile') ) {
const lines = buf.concat(chunk).split(/\r?\n/);
buf = lines.pop();
for( const line of lines ) {
console.log(line);
}
}
if(buf.length) console.log(buf); // last line, if file does not end with newline
您可以在 fs.createReadStream
中调整编码或使用 chunk.toString(<arg>)
。这也让你更好地根据你的喜好微调分割线,即。使用 .split(/\n+/)
跳过空行并使用 { highWaterMark: <chunkSize> }
控制块大小。
不要忘记创建一个类似 processLine(line)
的函数,以避免由于结束 buf
剩余而重复行处理代码两次。不幸的是,ReadStream
实例在此设置中没有更新其文件结束标志,因此 afaik 无法在循环中检测到我们处于最后一次迭代中,而不需要一些更详细的技巧,例如比较文件fs.Stats()
和 .bytesRead
的大小。因此最终的 buf
处理解决方案,除非您绝对确定您的文件以换行符 \n
结尾,在这种情况下 for await
循环就足够了。
★ 如果你更喜欢事件异步版本,那就是:
let buf = '';
fs.createReadStream('myfile')
.on('data', chunk => {
const lines = buf.concat(chunk).split(/\r?\n/);
buf = lines.pop();
for( const line of lines ) {
console.log(line);
}
})
.on('end', () => buf.length && console.log(buf) );
★ 现在,如果您不介意导入 stream
核心包,那么这是等效的管道流版本,它允许链接转换,如 gzip 解压缩:
const { Writable } = require('stream');
let buf = '';
fs.createReadStream('myfile').pipe(
new Writable({
write: (chunk, enc, next) => {
const lines = buf.concat(chunk).split(/\r?\n/);
buf = lines.pop();
for (const line of lines) {
console.log(line);
}
next();
}
})
).on('finish', () => buf.length && console.log(buf) );
我用这个:
function emitLines(stream, re){
re = re && /\n/;
var buffer = '';
stream.on('data', stream_data);
stream.on('end', stream_end);
function stream_data(data){
buffer += data;
flush();
}//stream_data
function stream_end(){
if(buffer) stream.emmit('line', buffer);
}//stream_end
function flush(){
var re = /\n/;
var match;
while(match = re.exec(buffer)){
var index = match.index + match[0].length;
stream.emit('line', buffer.substring(0, index));
buffer = buffer.substring(index);
re.lastIndex = 0;
}
}//flush
}//emitLines
在流上使用此函数并侦听将发出的行事件。
gr-
虽然您可能应该按照最佳答案的建议使用 readline
模块,但 readline
似乎面向命令行界面而不是行阅读。关于缓冲,它也有点不透明。 (任何需要面向流线的阅读器的人都可能想要调整缓冲区大小)。 readline 模块大约是 1000 行,而带有统计和测试的这个模块是 34 行。
const EventEmitter = require('events').EventEmitter;
class LineReader extends EventEmitter{
constructor(f, delim='\n'){
super();
this.totalChars = 0;
this.totalLines = 0;
this.leftover = '';
f.on('data', (chunk)=>{
this.totalChars += chunk.length;
let lines = chunk.split(delim);
if (lines.length === 1){
this.leftover += chunk;
return;
}
lines[0] = this.leftover + lines[0];
this.leftover = lines[lines.length-1];
if (this.leftover) lines.pop();
this.totalLines += lines.length;
for (let l of lines) this.onLine(l);
});
// f.on('error', ()=>{});
f.on('end', ()=>{console.log('chars', this.totalChars, 'lines', this.totalLines)});
}
onLine(l){
this.emit('line', l);
}
}
//Command line test
const f = require('fs').createReadStream(process.argv[2], 'utf8');
const delim = process.argv[3];
const lineReader = new LineReader(f, delim);
lineReader.on('line', (line)=> console.log(line));
这是一个更短的版本,没有统计信息,只有 19 行:
class LineReader extends require('events').EventEmitter{
constructor(f, delim='\n'){
super();
this.leftover = '';
f.on('data', (chunk)=>{
let lines = chunk.split(delim);
if (lines.length === 1){
this.leftover += chunk;
return;
}
lines[0] = this.leftover + lines[0];
this.leftover = lines[lines.length-1];
if (this.leftover)
lines.pop();
for (let l of lines)
this.emit('line', l);
});
}
}
const fs = require("fs")
fs.readFile('./file', 'utf-8', (err, data) => {
var innerContent;
console.log("Asynchronous read: " + data.toString());
const lines = data.toString().split('\n')
for (let line of lines)
innerContent += line + '<br>';
});
我将日常线路处理的整个逻辑包装为一个 npm 模块:line-kit https://www.npmjs.com/package/line-kit
// 示例 var count = 0 require('line-kit')(require('fs').createReadStream('/etc/issue'), (line) => { count++; }, () => {console.日志(`看到 ${count} 行`)})
在验证它不是目录并且它不包含在不需要检查的文件列表中之后,我使用下面的代码读取行。
(function () {
var fs = require('fs');
var glob = require('glob-fs')();
var path = require('path');
var result = 0;
var exclude = ['LICENSE',
path.join('e2e', 'util', 'db-ca', 'someother-file'),
path.join('src', 'favicon.ico')];
var files = [];
files = glob.readdirSync('**');
var allFiles = [];
var patternString = [
'trade',
'order',
'market',
'securities'
];
files.map((file) => {
try {
if (!fs.lstatSync(file).isDirectory() && exclude.indexOf(file) === -1) {
fs.readFileSync(file).toString().split(/\r?\n/).forEach(function(line){
patternString.map((pattern) => {
if (line.indexOf(pattern) !== -1) {
console.log(file + ' contain `' + pattern + '` in in line "' + line +'";');
result = 1;
}
});
});
}
} catch (e) {
console.log('Error:', e.stack);
}
});
process.exit(result);
})();
我浏览了以上所有答案,都使用第三方库来解决它。它在 Node 的 API 中有一个简单的解决方案。例如
const fs= require('fs')
let stream = fs.createReadStream('<filename>', { autoClose: true })
stream.on('data', chunk => {
let row = chunk.toString('ascii')
}))
rl.on('close', cb)