为什么这个 readline 异步迭代器无法正常工作?

2024-03-17

这是一个更大流程的一部分,我在节点 v14.4.0 中将其提炼为最小的、可重现的示例。在此代码中,它从内部不输出任何内容for loop.

我在控制台中只看到这个输出:

before for() loop
finished
finally
done

The for await (const line1 of rl1)循环永远不会进入for循环 - 它只是跳过它:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        await once(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const stream2 = fs.createReadStream(file2);
        await once(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

但是,如果我删除其中任何一个await once(stream, 'open')语句,那么for循环完全按照预期执行(列出了rl1文件)。因此,显然,来自 readline 接口和流之间的异步迭代器存在一些计时问题。任何想法可能会发生什么。知道什么可能导致这个问题或如何解决它吗?

仅供参考,await once(stream, 'open')是否存在因为异步迭代器中的另一个错误,如果打开文件时出现问题,它不会拒绝,而await once(stream, 'open')如果文件无法打开(本质上是在打开之前进行预检),则会导致您正确地收到拒绝。

如果您想知道为什么存在 Stream2 代码,它是在较大的项目中使用的,但我已将此示例简化为最小的、可重现的示例,并且只需要这么多代码来演示该问题。


Edit:在尝试稍微不同的实现时,我发现如果我将两者结合起来once(stream, "open")调用一个Promise.all(),然后它就可以工作了。所以,这有效:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const stream2 = fs.createReadStream(file2);
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
        // pre-flight file open to catch any open errors here
        // because of existing bug in async iterator with file open errors
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

这显然不应该对您等待文件打开的方式敏感。某处存在一些计时错误。我想在 readline 或 readStream 上找到该错误并将其归档。有任何想法吗?


事实证明,根本问题是readline.createInterface()立即,在调用时它会添加一个data事件监听器(代码参考这里 https://github.com/nodejs/node/blob/master/lib/readline.js#L236)并恢复流以开始流流动。

input.on('data', ondata);

and

input.resume();

然后,在ondata侦听器,它解析行数据,当找到行时,它会触发line events here https://github.com/nodejs/node/blob/master/lib/readline.js#L481.

for (let n = 0; n < lines.length; n++)
  this._onLine(lines[n]);

但是,在我的示例中,在这段时间之间还发生了其他异步事情readline.createInterface()被调用并创建了异步迭代器(它将侦听line事件)。所以,line事件正在被发出,但还没有任何东西在监听它们。

所以,要正常工作readline.createInterface()要求无论要听什么line调用后必须同步添加事件readline.createInterface()或者存在竞争条件并且line 事件可能会丢失。


在我的原始代码示例中,解决此问题的可靠方法是不调用readline.createInterface()直到我完成之后await once(...)。然后,异步迭代器将在之后同步创建readline.createInterface()叫做。

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const stream2 = fs.createReadStream(file2);
        // wait for both files to be open to catch any "open" errors here
        // since readline has bugs about not properly reporting file open errors
        // this await must be done before either call to readline.createInterface()
        // to avoid race conditions that can lead to lost lines of data
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

解决这个普遍问题的一种方法是改变readline.createInterface()这样它就不会添加data事件并恢复流,直到有人添加line事件监听器。这可以防止数据丢失。它将允许 readline 接口对象安静地坐在那里,而不会丢失数据,直到其输出的接收器真正准备好为止。这适用于异步迭代器,并且还可以防止混合了其他异步代码的接口的其他使用可能会丢失line events.

请注意将此添加到相关的开放阅读行错误问题中here https://github.com/nodejs/node/issues/34219#issuecomment-657921958.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么这个 readline 异步迭代器无法正常工作? 的相关文章

随机推荐