send tcp char far_[Rust异步] TCP 回声服务

2023-05-16

async/await 将在 11 月的 Rust 1.39 稳定,是时候上车了!

本文将介绍如何用 async-std 编写一个 TCP 回声服务,以熟悉 Rust 中的异步编程模式。

项目初始化

新建一个 lib 项目,使用 nightly 编译。您需要安装 Rust 1.39 nightly 版本。

cargo new --lib async-echo
cd async-echo
echo nightly > rust-toolchain

在 Cargo.toml 中添加依赖。

[dependencies]
futures-preview = { version = "0.3.0-alpha.18", features = [ "async-await", "nightly" ] }
async-std = "0.99"

编写服务

完善的错误处理不是本文的重点,我们把所有错误直接向上抛。在异步编程中,错误有可能跨越线程,因此需要 Send + Sync + 'static.

pub type EcResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync + 'static>>;

协程 echo_server 接收套接字地址,监听对应端口。

Incoming 是一个异步流类型,迭代它可以取得连接到端口的 TCP 数据流。我们为每个数据流启动一个协程 echo. Incoming 是无限流,这代表 echo_server 永不终止,当关闭服务时,我们需要从外部取消协程。

use std::net::ToSocketAddrs;

use async_std::net::{Incoming, TcpListener, TcpStream};
use async_std::task;

use futures::StreamExt;

pub async fn echo_server(addr: impl ToSocketAddrs) -> EcResult<()> {
    let listener: TcpListener = TcpListener::bind(addr).await?;
    println!("listening: {}", listener.local_addr()?);
    let mut incoming: Incoming = listener.incoming();
    while let Some(stream) = incoming.next().await {
        let stream: TcpStream = stream?;
        task::spawn(echo(stream));
    }

    Ok(())
}

async fn echo(_stream: TcpStream) -> EcResult<()> {
    unimplemented!()
}

回声

TCP 数据流可读可写,从数据流同时产生 reader 和 writer 是安全的,因为 Safe API 允许我们这么做,可以放心。

还可以在 lib.rs 顶部加上 #![forbid(unsafe_code)],保证不调用 unsafe 代码。

以行为单位,每读一行,就向数据流中原样写回一行。当数据流不再产生数据时,说明对方关闭了连接,我们也关闭协程。

use async_std::io::{BufRead, BufReader, Write};

async fn echo(stream: TcpStream) -> EcResult<()> {
    let addr = stream.peer_addr()?;
    println!("[{} online]", addr);

    let (reader, mut writer) = (&stream, &stream);
    let reader = BufReader::new(reader);
    let mut lines = reader.lines();

    while let Some(line) = lines.next().await {
        let mut line = line?;
        println!("[{}]: {}", addr, line);
        line.push('n');
        writer.write_all(line.as_bytes()).await?;
    }

    println!("[{} offline]", addr);
    Ok(())
}

客户端

协程 echo_client 同时处理网络连接和标准输入,作为客户端。

连接到服务后,来自网络的流和来自标准输入的流可以一起处理。用 select 等待多个 future 中的一个就绪,立即处理结果,循环直到两个流中的一个关闭。

use async_std::io::stdin;
use futures::select;

pub async fn echo_client(addr: impl ToSocketAddrs) -> EcResult<()> {
    let stream = TcpStream::connect(addr).await?;
    println!("connecting: {}", stream.peer_addr()?);
    let (reader, mut writer) = (&stream, &stream);
    let mut server_lines = BufReader::new(reader).lines().fuse();
    let mut stdin_lines = BufReader::new(stdin()).lines().fuse();

    let handle_server_line = |line: String| println!("server: {}", line);

    let handle_stdin_line = move |mut line: String| {
        println!("input: {}", line);
        line.push('n');
        async move { writer.write_all(line.as_bytes()).await }
    };

    loop {
        select! {
            line = server_lines.next() => match line{
                None => {
                    println!("Connection was closed by server");
                    break;
                }
                Some(line) => handle_server_line(line?),
            },
            line = stdin_lines.next() => match line{
                None => break,
                Some(line) => handle_stdin_line(line?).await?,
            },
        }
    }

    Ok(())
}

执行

创建两个可执行程序。

cd src
mkdir bin
cd bin
touch server.rs client.rs

在 server.rs 中,等待标准输入中输入 "exit",在此期间运行服务。 如果出错,就直接打印出错误内容,也可以直接 unwarp,便于查看 backtrace。

use async_echo::{echo_server, EcResult};

use async_std::io::{stdin, BufRead, BufReader};
use async_std::stream::Stream;
use async_std::task;

async fn wait_exitus() -> EcResult<()> {
    let mut lines = BufReader::new(stdin()).lines();

    while let Some(line) = lines.next().await {
        if line? == "exit" {
            break;
        }
    }

    Ok(())
}

fn main() {
    task::spawn(async {
        if let Err(e) = echo_server("127.0.0.1:9102").await {
            eprintln!("{}", e);
        }
    });
    let _ = task::block_on(wait_exitus());
}

client.rs 非常简单,直接执行协程 echo_client 即可。

use async_echo::echo_client;

use async_std::task;

fn main() {
    if let Err(e) = task::block_on(echo_client("127.0.0.1:9102")) {
        eprintln!("{}", e)
    }
}

测试

在不同的终端运行以下命令

cargo run --release --bin server
cargo run --release --bin client

服务端

listening: 127.0.0.1:9102
[127.0.0.1:51234 online]
[127.0.0.1:51234]: hello
[127.0.0.1:51234 offline]
[127.0.0.1:51236 online]
[127.0.0.1:51244 online]
[127.0.0.1:51248 online]
[127.0.0.1:51248]: A
[127.0.0.1:51244]: B
[127.0.0.1:51236]: C
[127.0.0.1:51236 offline]
[127.0.0.1:51248 offline]
exit

客户端

connecting: 127.0.0.1:9102
B
input: B
server: B
Connection was closed by server

仓库

Nugine/async-echo​github.com
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

send tcp char far_[Rust异步] TCP 回声服务 的相关文章

随机推荐