如何从产生数据块的慢速处理侧线程流式传输超级请求的正文?

2023-12-01

我有一个程序可以缓慢地生成数据(我们可以说它是计算密集型的,就像计算 pi 的数字一样)。它产生一个lot数据的;每个响应可以是 1GiB,无法容纳在内存中,并且must按需生成。我正在使用 hyper 编写一个 Web 服务来根据请求生成内容。

让我们跳过样板(service_fn, Server::bind).

缓慢生成数据的 API 可能类似于

use std::io;

impl SlowData {
    fn new(initial: &str) -> SlowData {
        unimplemented!()
    }

    fn next_block(&self) -> io::Result<&[u8]> {
        unimplemented!()
    }
}

type ResponseFuture = Box<Future<Item = Response, Error = GenericError> + Send>;

fn run(req: Request) -> ResponseFuture {
    // spawn a thread and:
    // initialize the generator
    // SlowData::new(&req.uri().path());

    // spawn a thread and call slow.next_block() until len()==0
    // each byte which comes from next_block should go to the client
    // as part of the Body
}

注意SlowData::new也是计算密集型的。

最理想的是,我们会尽量减少副本并发送&[u8]直接到 hyper,无需将其复制到Vec或者其他的东西。

如何从侧线程完成超级请求的正文?


启动线程池中的线程并通过通道发送数据块。渠道实现Stream和一个超级Body可以由Stream using wrap_stream:

use futures::{channel::mpsc, executor::ThreadPool, task::SpawnExt, SinkExt, Stream}; // 0.3.1, features = ["thread-pool"]
use hyper::{
    service::{make_service_fn, service_fn},
    Body, Response, Server,
}; // 0.13.1
use std::{convert::Infallible, io, thread, time::Duration};
use tokio; // 0.2.6, features = ["macros"]

struct SlowData;
impl SlowData {
    fn new(_initial: &str) -> SlowData {
        thread::sleep(Duration::from_secs(1));
        Self
    }

    fn next_block(&self) -> io::Result<&[u8]> {
        thread::sleep(Duration::from_secs(1));
        Ok(b"data")
    }
}

fn stream(pool: ThreadPool) -> impl Stream<Item = io::Result<Vec<u8>>> {
    let (mut tx, rx) = mpsc::channel(10);

    pool.spawn(async move {
        let sd = SlowData::new("dummy");

        for _ in 0..3 {
            let block = sd.next_block().map(|b| b.to_vec());
            tx.send(block).await.expect("Unable to send block");
        }
    })
    .expect("Unable to spawn thread");

    rx
}

#[tokio::main]
async fn main() {
    // Construct our SocketAddr to listen on...
    let addr = ([127, 0, 0, 1], 3000).into();

    // Create a threadpool (cloning is cheap)...
    let pool = ThreadPool::new().unwrap();

    // Handle each connection...
    let make_service = make_service_fn(|_socket| {
        let pool = pool.clone();

        async {
            // Handle each request...
            let svc_fn = service_fn(move |_request| {
                let pool = pool.clone();

                async {
                    let data = stream(pool);
                    let resp = Response::new(Body::wrap_stream(data));

                    Result::<_, Infallible>::Ok(resp)
                }
            });

            Result::<_, Infallible>::Ok(svc_fn)
        }
    });

    // Bind and serve...
    let server = Server::bind(&addr).serve(make_service);

    // Finally, run the server
    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

创建线程时,无法避免将切片复制到Vec.

也可以看看:

  • 这个答案适用于 hyper 0.12 和 future 0.1
  • 如何通过 io::Write 特征写入来通过 futures Stream 发送数据?
  • Rust 中 async/await 的目的是什么?
  • 在 future-rs 中封装阻塞 I/O 的最佳方法是什么?
  • 如何使用 futures.rs 和 Redis PubSub 实现阻塞调用的 future 流?
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从产生数据块的慢速处理侧线程流式传输超级请求的正文? 的相关文章

随机推荐

  • Leaflet GeoJSON 是否可以在到达目的地之前裁剪线要素?

    有没有一种简单的方法可以缩短 GeoJSON 图层上的线条 我有一条线 它从 A 点到 B 点 我希望这条线在标记的半径附近停止 那可能吗 有点像从线路终点 起点的偏移量 这是一个例子 我有 50 x 50 的图标 但半透明 参见图片 并且
  • getIntent.getExtras.getString() 中的 null 值

    这是我在第一个活动中的代码 Intent i new Intent this OtherScreen class i putExtra id1 first i putExtra id2 second startActivity i 其中第一
  • 使用仅具有 id 值的实体保存外键

    如果我有两个休眠实体 例如 Entity class Company Id Integer id String name Entity class Person Integer id String name ManyToOne Compan
  • C++ 中的静态构造函数和致命错误 LNK1120: 1 无法解析的外部

    首先 我可能应该让你知道我绝不是一名程序员 我只是为了一项家庭作业而这样做 所以如果可能的话 我将需要一个非常详细的解释 我目前有一个 Node 类 用于存储点的坐标 除此之外 我想要用它做的是根据计数器为每个不同的 Node 对象分配一个
  • 在 R 中的 read.csv 中指定 colClasses 时出现问题

    我试图在 read csv 中指定 colClasses 以尝试加快 csv 文件的读取速度 但是 我遇到了以下问题 假设我有一个名为 t csv 的文件 a b x 0 然后 如果我在 R 中运行以下命令 data lt read csv
  • 客户端-服务器 Java GUI:读/写导致程序冻结

    我正在用 Java 编写客户端 服务器程序 包括 GUI 我在客户端有以下代码 public class SBListener implements ActionListener public void actionPerformed Ac
  • leetcode 的 Java 4sum 实现

    leetcode 的问题陈述是这样的 给定一个由 n 个整数组成的数组 S S 中是否存在元素 a b c 和 d 使得 a b c d target 找到数组中所有唯一的四元组 给出目标的总和 Note Elements in a qua
  • 如何继承带有模板的类? [复制]

    这个问题在这里已经有答案了 为什么下面的方法可以正常工作 class a public int n class b public a public b n 1 int main 但这不起作用 template
  • 实现查找表

    我正在开发一个自定义数据结构 目前正在进行 beta 测试过程 数据将存储在一个数组中 并且该数组可以表示为 4D 2D 和 1D 数组 这三个数组在联合中声明 因为它代表相同的内存寻址 这是给我的班级的声明 一些类 h ifndef So
  • Java 2012-006 更新破坏了 xCode 上传

    这与这个问题相关无法提交申请 然而 在 Apple 最近的 Java 更新 删除了 Web 插件 之后 Oracle 立即发布了 Java 7 的更新 现在 xcode 无法再上传 上传到 iTunes Store 时发生错误 并且旧的修复
  • 使用 Python 请求的异步请求

    我尝试了文档中提供的示例请求库对于蟒蛇 With async map rs 我得到了响应代码 但我想获取所请求的每个页面的内容 例如 这不起作用 out async map rs print out 0 content Note 下面的答案
  • Java Keylistener 没有打开窗口?

    我正在尝试用 Java 创建一个自动点击器 这是我所知道的语言 而且我刚刚学习了线程 我想让小程序在它自己的窗口中打开 而不是在网页上 并且我希望能够使用空格键启动和停止程序而不选择窗口 以便我可以在另一个程序上使用自动点击器并能够停止它而
  • React-Redux - 创建搜索过滤器[关闭]

    Closed 这个问题需要细节或清晰度 目前不接受答案 我需要帮助在我的应用程序中制作搜索过滤器 它是一个简单的应用程序 用于学习目的 目标是创建一个搜索过滤器 我有state在 search bar 容器中 我认为我需要将其作为道具传递给
  • 对于 TFS 构建,$(TeamBuildConstants) 为空

    我有一个像这样的构建后事件 如果不是 TeamBuildConstants TEAM BUILD SolutionDir Tools NuGet exe 包 ProjectDir MyAssembly nuspec BasePath Pro
  • 迭代时从列表中删除[重复]

    这个问题在这里已经有答案了 我有一个清单 a 1 2 3 4 5 6 7 8 9 b 10 11 12 13 14 15 16 17 18 遍历列表时b 如果任何数字小于15 则从列表中删除其对应的数字 索引 a 例如 在列表中b 10 1
  • Instagram,如何按标签获取所有照片?

    这样我就可以得到有多少帖子有 SOMETAG https api instagram com v1 tags SOMETAG 我可以通过标签获取图像 https api instagram com v1 tags SOMETAG media
  • 如何在关闭阶段 JavaFX 后停止 WebEngine?

    当我使用 WebEngine 创建新舞台来播放 YouTube 视频时 在我关闭它之后 Youtube 继续在后台播放 如果我使用 Platform exit 它会关闭我所有的JavaFX应用程序 但我只想关闭为YouTube创建的阶段 这
  • 将 SQL 与 Java 代码分离

    这是我在连接数据库时经常遇到的问题 如何将SQL从普通的java代码中分离出来 我通常为数据库连接使用单独的类 但是 当您有多个数据库并且每个数据库中有多个表时 总是很难 100 做到这一点 举个例子 如果我们想将所有的 java SQL
  • 如何使用Delphi正确执行.bat文件

    我正在运行 delphi 2010 中的 bat 文件 procedure TForm1 Button2Click Sender TObject var sCmd String Begin sCmd Pwidechar b4a c2dm b
  • 如何从产生数据块的慢速处理侧线程流式传输超级请求的正文?

    我有一个程序可以缓慢地生成数据 我们可以说它是计算密集型的 就像计算 pi 的数字一样 它产生一个lot数据的 每个响应可以是 1GiB 无法容纳在内存中 并且must按需生成 我正在使用 hyper 编写一个 Web 服务来根据请求生成内