- 期望的行为
- 实际行为
- 我尝试过的
- 重现步骤
- 研究
期望的行为
将从多个 api 请求接收到的多个可读流传输到单个可写流。
API 响应来自 ibm-watsontextToSpeech.synthesize() https://cloud.ibm.com/apidocs/text-to-speech?code=node#synthesize-audio method.
需要多个请求的原因是该服务有一个5KB
文本输入限制。
因此一串18KB
例如,需要四个请求才能完成。
实际行为
可写流文件不完整,出现乱码。
该应用程序似乎“挂起”。
当我尝试打开不完整的.mp3
文件在音频播放器中,它说它已损坏。
打开和关闭文件的过程似乎会增加其文件大小 - 就像打开文件会以某种方式提示更多数据流入其中一样。
输入越大,不良行为越明显,例如四个 4000 字节或更少的字符串。
我尝试过的
我尝试了多种方法使用 npm 包将可读流传输到单个可写流或多个可写流组合流 https://www.npmjs.com/package/combined-stream, 组合流2 https://www.npmjs.com/package/combined-stream2, 多流 https://www.npmjs.com/package/multistream and archiver https://www.npmjs.com/package/archiver它们都会导致文件不完整。我的最后一次尝试没有使用任何软件包,并显示在Steps To Reproduce
下面的部分。
因此,我质疑我的应用程序逻辑的每个部分:
01.Watson 文本转语音 api 请求的响应类型是什么?
The 文本转语音文档 https://cloud.ibm.com/apidocs/text-to-speech?code=node#synthesize-audio,假设 api 响应类型是:
Response type: NodeJS.ReadableStream|FileObject|Buffer
我很困惑,响应类型是三种可能的事情之一。
在我所有的尝试中,我一直假设它是readable stream
.
02.我可以在一个地图函数中发出多个 api 请求吗?
03.我可以将每个请求包装在一个promise()
并解决response
?
04.我可以将结果数组分配给promises
多变的?
05.我可以声明吗var audio_files = await Promise.all(promises)
?
06.在此声明之后,所有响应都“完成”了吗?
07.如何正确地将每个响应传送到可写流?
08.如何检测所有管道何时完成,以便我可以将文件发送回客户端?
对于问题 2 - 6,我假设答案是“是”。
我认为我的失败与问题7和8有关。
重现步骤
您可以使用四个随机生成的文本字符串的数组来测试此代码,每个字符串的字节大小为3975
, 3863
, 3974
and 3629
字节 -这是该数组的粘贴箱 https://pastebin.com/raw/JkK8ehwV.
// route handler
app.route("/api/:api_version/tts")
.get(api_tts_get);
// route handler middleware
const api_tts_get = async (req, res) => {
var query_parameters = req.query;
var file_name = query_parameters.file_name;
var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV
var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root
// for each string in an array, send it to the watson api
var promises = text_string_array.map(text_string => {
return new Promise((resolve, reject) => {
// credentials
var textToSpeech = new TextToSpeechV1({
iam_apikey: iam_apikey,
url: tts_service_url
});
// params
var synthesizeParams = {
text: text_string,
accept: 'audio/mp3',
voice: 'en-US_AllisonV3Voice'
};
// make request
textToSpeech.synthesize(synthesizeParams, (err, audio) => {
if (err) {
console.log("synthesize - an error occurred: ");
return reject(err);
}
resolve(audio);
});
});
});
try {
// wait for all responses
var audio_files = await Promise.all(promises);
var audio_files_length = audio_files.length;
var write_stream = fs.createWriteStream(`${relative_path}.mp3`);
audio_files.forEach((audio, index) => {
// if this is the last value in the array,
// pipe it to write_stream,
// when finished, the readable stream will emit 'end'
// then the .end() method will be called on write_stream
// which will trigger the 'finished' event on the write_stream
if (index == audio_files_length - 1) {
audio.pipe(write_stream);
}
// if not the last value in the array,
// pipe to write_stream and leave open
else {
audio.pipe(write_stream, { end: false });
}
});
write_stream.on('finish', function() {
// download the file (using absolute_path)
res.download(`${absolute_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
// delete the file (using relative_path)
fs.unlink(`${relative_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
});
});
});
} catch (err) {
console.log("there was an error getting tts");
console.log(err);
}
}
The 官方示例 https://cloud.ibm.com/apidocs/text-to-speech?code=node#synthesize-audio shows:
textToSpeech.synthesize(synthesizeParams)
.then(audio => {
audio.pipe(fs.createWriteStream('hello_world.mp3'));
})
.catch(err => {
console.log('error:', err);
});
据我所知,这似乎适用于单个请求,但不适用于多个请求。
Research
关于可读和可写流、可读流模式(流动和暂停)、'data'、'end'、'drain' 和 'finish' 事件、pipe()、fs.createReadStream() 和 fs.createWriteStream()
几乎所有 Node.js 应用程序,无论多么简单,都以某种方式使用流......
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream
let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});
// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
https://nodejs.org/api/stream.html#stream_api_for_stream_consumers https://nodejs.org/api/stream.html#stream_api_for_stream_consumers
可读流有两种主要模式影响我们使用它们的方式......它们可以是paused
模式或在flowing
模式。默认情况下,所有可读流都以暂停模式启动,但可以轻松切换到flowing
然后回到paused
当需要时...只需添加一个data
事件处理程序将暂停的流切换到flowing
模式并删除data
事件处理程序将流切换回paused
模式。
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93 https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
以下是可与可读和可写流一起使用的重要事件和函数的列表
可读流上最重要的事件是:
The data
事件,每当流将一块数据传递给消费者时就会发出该事件
这end
事件,当流中不再有数据可消耗时发出该事件。
可写流上最重要的事件是:
The drain
event,这是可写流可以接收更多数据的信号。
这finish
事件,当所有数据都已刷新到底层系统时发出。
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93 https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
.pipe()
负责监听来自的“数据”和“结束”事件fs.createReadStream()
.
https://github.com/substack/stream-handbook#why-you-should-use-streams https://github.com/substack/stream-handbook#why-you-should-use-streams
.pipe()
只是一个函数,它接受可读源流 src 并将输出挂钩到目标可写流dst
https://github.com/substack/stream-handbook#pipe https://github.com/substack/stream-handbook#pipe
返回值pipe()
method 是目标流
https://flaviocopes.com/nodejs-streams/#pipe https://flaviocopes.com/nodejs-streams/#pipe
默认情况下,流.end() https://nodejs.org/api/stream.html#stream_writable_end_chunk_encoding_callback在目的地被调用Writable
源时流Readable
流发出'end'
,使得目的地不再可写。要禁用此默认行为,end
选项可以传递为false
,导致目标流保持打开状态:
https://nodejs.org/api/stream.html#stream_read_pipe_destination_options https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options
The 'finish'
事件在之后发出stream.end()
方法已被调用,所有数据已刷新到底层系统。
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
console.log('All writes are now complete.');
});
https://nodejs.org/api/stream.html#stream_event_finish https://nodejs.org/api/stream.html#stream_event_finish
如果您尝试读取多个文件并将它们传输到可写流,则必须将每个文件传输到可写流并传递end: false
这样做时,因为默认情况下,当没有更多数据可供读取时,可读流会结束可写流。这是一个例子:
var ws = fs.createWriteStream('output.pdf');
fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);
https://stackoverflow.com/a/30916248 https://stackoverflow.com/a/30916248
您想将第二次读取添加到事件监听器中,以便第一次读取完成......
var a = fs.createReadStream('a');
var b = fs.createReadStream('b');
var c = fs.createWriteStream('c');
a.pipe(c, {end:false});
a.on('end', function() {
b.pipe(c)
}
https://stackoverflow.com/a/28033554 https://stackoverflow.com/a/28033554
节点流简史 - 部分one https://medium.com/the-node-js-collection/a-brief-history-of-node-streams-pt-1-3401db451f21 and two https://medium.com/the-node-js-collection/a-brief-history-of-node-streams-pt-2-bcb6b1fd7468.
相关谷歌搜索:
如何将多个可读流传输到单个可写流?节点js
涉及相同或相似主题的问题,没有权威答案(或者可能“过时”):
如何将多个 ReadableStream 传输到单个 WriteStream? https://stackoverflow.com/questions/54486160/how-to-pipe-multiple-readablestreams-to-a-single-writestream
通过不同的可读流两次管道传输到相同的可写流 https://stackoverflow.com/questions/46504589/piping-to-same-writable-stream-twice-via-different-readable-stream
通过管道将多个文件传输到一个响应 https://stackoverflow.com/questions/10462649/pipe-multiple-files-to-one-response
从两个管道流创建 Node.js 流 https://stackoverflow.com/questions/17471659/creating-a-node-js-stream-from-two-piped-streams