ASIO 示例代码在应该之前关闭套接字

2023-12-15

我需要一个使用 ASIO 的并行同步 TCP 解决方案。我正在尝试从这些示例中获取示例代码:https://github.com/jvillasante/asio-network-programming-cookbook/tree/master/src(使用ch04中的服务器:02_Sync_parallel_tcp_server.cpp和ch03中的客户端:01_Sync_tcp_client.cpp)。

我唯一改变的是附加到文本文件的日志记录。

问题是,虽然服务器运行良好,但客户端在从服务器返回单个响应后就死掉了:

libc++abi.dylib: terminating with uncaught exception of type boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >: shutdown: Socket is not connected

服务器代码:

#include <boost/asio.hpp>
#include <atomic>
#include <memory>
#include <thread>
#include <iostream>
#include <fstream>

using namespace boost;

class Service {
public:
  Service() = default;

  void StartHandlingClient(std::shared_ptr<asio::ip::tcp::socket> sock) {
    std::thread th{[this, sock]() { HandleClient(sock); }};
    th.detach();
  }

private:
  void HandleClient(std::shared_ptr<asio::ip::tcp::socket> sock) {
    try {
      asio::streambuf request;
      asio::read_until(*sock.get(), request, '\n');

      std::istream is(&request);
      std::string line;
      std::getline(is, line);

      std::ofstream log("logfile2.txt", std::ios_base::app | std::ios_base::out);
      log << "Request: " << line << "\n" << std::flush;

      // Emulate request processing.
      int i = 0;
      while (i != 1000000) i++;
      std::this_thread::sleep_for(std::chrono::milliseconds(500));

      // Sending response.
      std::string response = "Response\n";
      asio::write(*sock.get(), asio::buffer(response));
    } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
    }

    // Clean up
    delete this;
  }
};

class Acceptor {
public:
  Acceptor(asio::io_service& ios, unsigned short port_num)
  : m_ios{ios}, m_acceptor{m_ios, asio::ip::tcp::endpoint{asio::ip::address_v4::any(), port_num}} {
    m_acceptor.listen();
  }

  void Accept() {
    auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);

    m_acceptor.accept(*sock.get());

    (new Service)->StartHandlingClient(sock);
  }

private:
  asio::io_service& m_ios;
  asio::ip::tcp::acceptor m_acceptor;
};

class Server {
public:
  Server() : m_stop{false} {}

  void Start(unsigned short port_num) {
    m_thread.reset(new std::thread([this, port_num]() { Run(port_num); }));
  }

  void Stop() {
    m_stop.store(true);
    m_thread->join();
  }

private:
  void Run(unsigned short port_num) {
    Acceptor acc{m_ios, port_num};

    while (!m_stop.load()) {
      acc.Accept();
    }
  }

private:
  std::unique_ptr<std::thread> m_thread;
  std::atomic<bool> m_stop;
  asio::io_service m_ios;
};

int main() {
  unsigned short port_num = 3333;

  try {
    Server srv;
    srv.Start(port_num);

    std::this_thread::sleep_for(std::chrono::seconds(60));

    srv.Stop();
  } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
  }

  return 0;
}

客户端代码:

#include <boost/asio.hpp>
#include <iostream>
#include <fstream>

using namespace boost;

class SyncTCPClient {
public:
  SyncTCPClient(const std::string& raw_ip_address, unsigned short port_num)
  : m_ep(asio::ip::address::from_string(raw_ip_address), port_num), m_sock(m_ios) {
    m_sock.open(m_ep.protocol());
  }
  ~SyncTCPClient() { close(); }

  void connect() { m_sock.connect(m_ep); }

  std::string emulateLongComputationOp(unsigned int duration_sec) {
    std::string request = "EMULATE_LONG_COMP_OP " + std::to_string(duration_sec) + "\n";
    sendRequest(request);
    return receiveResponse();
  }

private:
  void close() {
    if (m_sock.is_open()) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "shutting down\n" << std::flush;
      m_sock.shutdown(asio::ip::tcp::socket::shutdown_both);
      log << "closing the socket\n" << std::flush;
      m_sock.close();
      log << "socket closed\n" << std::flush;
    }
  }

  void sendRequest(const std::string& request) { asio::write(m_sock, asio::buffer(request)); }

  std::string receiveResponse() {
    asio::streambuf buf;
    asio::read_until(m_sock, buf, '\n');

    std::istream input(&buf);
    std::string response;
    std::getline(input, response);

    return response;
  }

private:
  asio::io_service m_ios;
  asio::ip::tcp::endpoint m_ep;
  asio::ip::tcp::socket m_sock;
};

int main() {
  const std::string raw_ip_address = "127.0.0.1";
  const unsigned short port_num = 3333;

  try {
    SyncTCPClient client{raw_ip_address, port_num};

    // Sync connect.
    client.connect();

    std::cout << "Sending request to the server...\n";
    std::string response = client.emulateLongComputationOp(10);

    std::cout << "Response received: " << response << "\n";
  } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
    return e.code().value();
  }

  return 0;
}

我没有看到太多错误,并且我无法使用所示代码重现问题。

事情我do see:

  1. 线程过程可以是静态的,因为它是无状态的(delete this是代码味道)
  2. 线程不需要分离(使用boost::thread_group::join_all会好很多)
  3. 您从服务器和客户端写入相同的日志文件;结果未定义
  4. 拼写.store() and .load() on an atomic<bool>不惯用
  5. 拼出*sock.get()任何类型的智能指针上都是不可原谅地不惯用的
  6. writing code().value()- 吞并这个类别 - 是一件坏事,并且e.what()不是获取消息的方式(使用e.code().message()).
  7. 如果你需要flush,你不妨使用std::endl
  8. 确实没有理由在 c++14 中使用shared_ptr:

    asio::ip::tcp::socket sock(m_ios);
    
    m_acceptor.accept(sock);
    
    std::thread([sock=std::move(sock)]() mutable { HandleClient(sock); }).detach();
    

    在 C++11 中坚持:

    auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);
    
    m_acceptor.accept(*sock);
    
    std::thread([sock] { HandleClient(*sock); }).detach();
    

    这意味着HandleClient可以只取一个ip::tcp::socket&而不是智能指针。

整合

服务器.cpp

#include <atomic>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
#include <memory>
#include <thread>

using namespace boost;

static void HandleClient(asio::ip::tcp::socket& sock) {
    try {
        asio::streambuf buf;
        asio::read_until(sock, buf, '\n');

        std::string request;
        getline(std::istream(&buf), request);

        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << "Request: " << request << std::endl;

        // Emulate request processing.
        int i = 0;
        while (i != 1000000)
            i++;
        std::this_thread::sleep_for(std::chrono::milliseconds(500));

        // Sending response.
        std::string response = "Response\n";
        asio::write(sock, asio::buffer(response));
    } catch (std::system_error &e) {
        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
    }
}

class Acceptor {
  public:
    Acceptor(asio::io_service &ios, unsigned short port_num)
            : m_ios{ ios }, m_acceptor{ m_ios, asio::ip::tcp::endpoint{ asio::ip::address_v4::any(), port_num } } {
        m_acceptor.listen();
    }

    void Accept() {
        auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);

        m_acceptor.accept(*sock);

        std::thread([sock] { HandleClient(*sock); }).detach();
    }

  private:
    asio::io_service &m_ios;
    asio::ip::tcp::acceptor m_acceptor;
};

class Server {
  public:
    Server() : m_stop{ false } {}

    void Start(unsigned short port_num) {
        m_thread.reset(new std::thread([this, port_num]() { Run(port_num); }));
    }

    void Stop() {
        m_stop = true;
        m_thread->join();
    }

  private:
    void Run(unsigned short port_num) {
        Acceptor acc{ m_ios, port_num };

        while (!m_stop) {
            acc.Accept();
        }
    }

  private:
    std::unique_ptr<std::thread> m_thread;
    std::atomic<bool> m_stop;
    asio::io_service m_ios;
};

int main() {
    unsigned short port_num = 3333;

    try {
        Server srv;
        srv.Start(port_num);

        std::this_thread::sleep_for(std::chrono::seconds(60));

        srv.Stop();
    } catch (std::system_error &e) {
        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
    }
}

客户端.cpp

#include <boost/asio.hpp>
#include <fstream>
#include <iostream>

using namespace boost;

class SyncTCPClient {
  public:
    SyncTCPClient(const std::string &raw_ip_address, unsigned short port_num)
            : m_ep(asio::ip::address::from_string(raw_ip_address), port_num), m_sock(m_ios) {
        m_sock.open(m_ep.protocol());
    }
    ~SyncTCPClient() { close(); }

    void connect() { m_sock.connect(m_ep); }

    std::string emulateLongComputationOp(unsigned int duration_sec) {
        std::string request = "EMULATE_LONG_COMP_OP " + std::to_string(duration_sec) + "\n";
        sendRequest(request);
        return receiveResponse();
    }

  private:
    void close() {
        if (m_sock.is_open()) {
            std::ofstream log("client.log", std::ios_base::app | std::ios_base::out);
            log << "shutting down" << std::endl;
            m_sock.shutdown(asio::ip::tcp::socket::shutdown_both);
            log << "closing the socket" << std::endl;
            m_sock.close();
            log << "socket closed" << std::endl;
        }
    }

    void sendRequest(const std::string &request) { asio::write(m_sock, asio::buffer(request)); }

    std::string receiveResponse() {
        asio::streambuf buf;
        asio::read_until(m_sock, buf, '\n');

        std::string response;
        getline(std::istream(&buf), response);

        return response;
    }

  private:
    asio::io_service m_ios;
    asio::ip::tcp::endpoint m_ep;
    asio::ip::tcp::socket m_sock;
};

int main() {
    const std::string raw_ip_address = "127.0.0.1";
    const unsigned short port_num = 3333;

    try {
        SyncTCPClient client{ raw_ip_address, port_num };

        // Sync connect.
        client.connect();

        std::cout << "Sending request to the server...\n";
        std::string response = client.emulateLongComputationOp(10);

        std::cout << "Response received: " << response << std::endl;
    } catch (std::system_error &e) {
        std::ofstream log("client.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
        return e.code().value();
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ASIO 示例代码在应该之前关闭套接字 的相关文章

随机推荐

  • 将混合嵌套列表转换为嵌套元组

    如果我有 easy nested list foo bar foofoo barbar 并希望拥有 foo bar foofoo barbar I can do tuple tuple i for i in easy nested list
  • 创建自定义 NSAttributedString.Key

    我正在尝试构建一个简单的笔记应用程序 目前 我关注的是使用不同文本样式设置文本的可能性 例如正文 标题 粗体 斜体等 我用了一个NSAttributedString设置不同的文本样式 现在 我想检测所选文本应用了哪种样式 我认为一个好方法是
  • Bash 实时读取 STDOUT 流

    我已经搜索过这个并期望找到数百个解决方案 但没有找到 我想读取 STDOUT 流并等待特定字符串出现 而不等待该过程完成 我现在所拥有的 等待该过程完成后再返回输出 RESP execute some command 2 gt 1 if R
  • 如何让 jQuery Sticky Float 插件反应动态页面高度变化?

    我目前正在使用 StickyFloathttp plugins jquery com project stickyfloat我通过 jQuery 动态更改对象所在的 div 的高度 CSS 高度的更改效果很好 但 StickyFloat 无
  • 使用 @JsonView 排除(如 @JsonIgnore)与 play 框架默认 json writer?

    看来你不能混合 JsonIgnore和 JsonView 我想默认隐藏一个字段 但在某些情况下显示它 基本上我已经有了这个设置 class Parent extends Model public Long id public Child c
  • 如何重构数千行Java代码?有没有可用的工具?

    在我们的应用程序中 我们有两个或三个类 其中包含整个 Java Swing 应用程序逻辑 这两个或三个类包含大约 7k 行代码 现在我被分配了重构这段 Java 代码的任务 我该如何开始 有没有可用的工具可以进行重构或至少指导我们 我推荐
  • 使用 ASP.NET MVC 的 HttpHandler

    如果我有一个标准 AXD HttpHandler 和 axd 的默认忽略路由 那么为什么 ASP NET MVC 仍然处理子目录中的请求 例如 如果有针对 Content Css css 的请求 axd d 如果请求是在 root css
  • 为什么这个 .equals() 代码示例返回“false”? [复制]

    这个问题在这里已经有答案了 考虑 class Dog int height int weight String name public class DogTest public static void main String args Do
  • MySQL将CHAR(32)数据类型转换为BINARY(16)而不丢失数据

    嗨 我有一张桌子 其中有一列char 32 数据类型 我需要将其转换为BINARY 16 数据类型 我尝试过更改列类型 但这会删除列中的所有数据 以下代码是我更新列的数据类型的方法 这导致我丢失了该列中的所有数据 ALTER TABLE t
  • 我的代码的 Boost 更新问题

    我最近将 boost 更新到 1 59 并安装在 usr local 中 我的系统默认安装在 usr 并且是1 46 我使用的是ubuntu 12 04 我的代码库使用 ROS Hydro 机器人操作系统 我有一个相当大的代码库 在更新之前
  • 是(n+1)!按照 (n!) 的顺序?你能给我一个证明吗?

    那 n 1 呢 另外 如果你能给我一个证明 可以帮助我更好地理解 我被困在这一点上 证明 n 1 在 O n 中 你必须证明存在一个常数 c 以便对于所有足够大的 n n gt n0 不等式 n 1 lt c n 成立 然而 由于 n 1
  • 使用 jquery 将下拉菜单链接到锚文本

    我在我的页面上选择下拉菜单
  • 未找到名为 os 的模块 - Django、mod_wsgi、Apache 2.2

    我正在尝试设置 apache mod wsgi 和 django 我的 apache 错误日志中出现了内部服务器错误 Wed Jun 22 21 31 55 2011 error client 1 mod wsgi pid 2893 Tar
  • 将 Json 文件内容保存到 python/pandas 中的 CSV 文件

    如何将 数据 信息放入最后所示的 csv 表中 以及正确的 标头 以便源服务器不会让我以为我正在抓取数据 到目前为止我写的代码如下 import requests json headers User Agent Mozilla 5 0 da
  • 令人困惑的宏和枚举定义

    我正在浏览一些 Route netlink 源代码 我想弄清楚 RTNLGRP NEIGH 的值是多少 Source http lxr free electrons com source include linux rtnetlink h
  • 使用 OpenGLES 抗锯齿去除绳索的锯齿状边缘

    我已经实现了绳索 其中我使用 Revolute 关节连接动态 b2bodied 现在我成功创建了这条绳索 但我的绳索看起来不光滑 我希望它们像丝带一样光滑 任何对此有想法的人 我发现它可以通过 openGLES 使用抗锯齿来实现 但仍然不知
  • PHP SimpleXML 大文件没有额外的内存使用

    在每一篇有关 SimpleXML 性能和内存使用的文章中 都会提到所有解析的内容都存储在内存中 处理大文件将导致大量的内存使用 但最近我发现使用 SimpleXML 处理大文件不会导致大量内存使用 甚至几乎不会导致内存使用 有我的测试脚本
  • 删除 Azure 资源组中年龄超过 x 天的所有资源

    我尝试在资源组 python api 中的资源上 扩展 creationTime 这样我就可以找到它的年龄 如果 gt max age days 我就会删除资源组中的资源 但creationTime似乎在资源上不可用 是否有另一种方法可以根
  • 用多种颜色为轴刻度文本着色

    我正在尝试绘制一个heatmap using R s plotly包 我希望为 y 轴刻度文本的特定标签设置特定的颜色 这是一个示例数据集 set seed 1 df lt reshape2 melt matrix rnorm 100 10
  • ASIO 示例代码在应该之前关闭套接字

    我需要一个使用 ASIO 的并行同步 TCP 解决方案 我正在尝试从这些示例中获取示例代码 https github com jvillasante asio network programming cookbook tree master