如何从 std::vector 自动删除已完成的 future

2024-04-28

在下面的示例中,mEventExecutors 是一个std::vector<std::future<void>>。我希望能够在完成后从向量中删除未来。这可以做到吗?

void RaiseEvent(EventID messageID)
{
    mEventExecutors.push_back(std::move(std::async([=]{
            auto eventObject = mEventListeners.find(messageID);
            if (eventObject != mEventListeners.end())
            {
                for (auto listener : eventObject->second)
                {
                    listener();
                }
            }
        })
    ));
}

这个问题本身已经被另一个人回答了,但它激起了我的好奇心,想知道如何用最少的代码行实现一个功能齐全、线程安全的任务管理器。

我还想知道是否可以将任务作为未来等待,或者可以选择提供回调函数。

当然,这引出了一个问题:这些 future 是否可以使用性感的延续语法.then(xxx)而不是阻塞代码。

这是我的尝试。

非常感谢克里斯托弗·科尔霍夫 (Christopher Kohlhoff),该书的作者boost::asio。通过研究他出色的工作,我了解到将类别分为以下几类的价值:

  • 句柄 - 控制对象的生命周期
  • 服务 - 提供对象逻辑、在对象实现之间共享的状态,并管理实现对象的生命周期(如果它们比句柄的生命周期长)(任何依赖于回调的事物通常都会这样做),以及
  • 实现提供每个对象的状态。

那么下面是调用代码的示例:

int main() {
    task_manager mgr;

    // an example of using async callbacks to indicate completion and error
    mgr.submit([] {
                   emit("task 1 is doing something");
                   std::this_thread::sleep_for(1s);
                   emit("task 1 done");
               },
               [](auto err) {
                   if (not err) {
                       emit("task 1 completed");
                   } else {
                       emit("task 1 failed");
                   }
               });

    // an example of returning a future (see later)
    auto f = mgr.submit([] {
        emit("task 2 doing something");
        std::this_thread::sleep_for(1500ms);
        emit("task 2 is going to throw");
        throw std::runtime_error("here is an error");
    }, use_future);

    // an example of returning a future and then immediately using its continuation.
    // note that the continuation happens on the task_manager's thread pool
    mgr.submit([]
               {
                   emit("task 3 doing something");
                   std::this_thread::sleep_for(500ms);
                   emit("task 3 is done");
               },
               use_future)
            .then([](auto f) {
                try {
                    f.get();
                }
                catch(std::exception const& e) {
                    emit("task 3 threw an exception: ", e.what());
                }
            });

    // block on the future of the second example
    try {
        f.get();
    }
    catch (std::exception &e) {
        emit("task 2 threw: ", e.what());
    }
}

这将导致以下输出:

task 1 is doing something
task 2 doing something
task 3 doing something
task 3 is done
task 1 done
task 1 completed
task 2 is going to throw
task 2 threw: here is an error

这是完整的代码(在 apple clang 上测试,它比 gcc 更混杂,所以如果我在 lambda 中错过了 this-> ,我很抱歉):

#define BOOST_THREAD_PROVIDES_FUTURE 1
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION 1
#define BOOST_THREAD_PROVIDES_EXECUTORS 1

/* written by Richard Hodges 2017
 * You're free to use the code, but please give credit where it's due :)
 */
#include <boost/thread/future.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <thread>
#include <utility>
#include <unordered_map>
#include <stdexcept>
#include <condition_variable>

// I made a task an object because I thought I might want to store state in it.
// it turns out that this is not strictly necessary

struct task {

};

/*
 * This is the implementation data for one task_manager
 */
struct task_manager_impl {

    using mutex_type = std::mutex;
    using lock_type = std::unique_lock<mutex_type>;

    auto get_lock() -> lock_type {
        return lock_type(mutex_);
    }

    auto add_task(lock_type const &lock, std::unique_ptr<task> t) {
        auto id = t.get();
        task_map_.emplace(id, std::move(t));
    }

    auto remove_task(lock_type lock, task *task_id) {
        task_map_.erase(task_id);
        if (task_map_.empty()) {
            lock.unlock();
            no_more_tasks_.notify_all();
        }
    }

    auto wait(lock_type lock) {
        no_more_tasks_.wait(lock, [this]() { return task_map_.empty(); });
    }

    // for this example I have chosen to express errors as exceptions
    using error_type = std::exception_ptr;

    mutex_type mutex_;
    std::condition_variable no_more_tasks_;


    std::unordered_map<task *, std::unique_ptr<task>> task_map_;
};

/*
 * This stuff is the protocol to figure out whether to return a future
 * or just invoke a callback.
 * Total respect to Christopher Kohlhoff of asio fame for figuring this out
 * I merely step in his footsteps here, with some simplifications because of c++11
 */
struct use_future_t {
};
constexpr auto use_future = use_future_t();

template<class Handler>
struct make_async_handler {
    auto wrap(Handler handler) {
        return handler;
    }

    struct result_type {
        auto get() -> void {}
    };

    struct result_type result;
};

template<>
struct make_async_handler<const use_future_t &> {
    struct shared_state_type {
        boost::promise<void> promise;
    };

    make_async_handler() {
    }

    template<class Handler>
    auto wrap(Handler &&) {
        return [shared_state = this->shared_state](auto error) {
            // boost promises deal in terms of boost::exception_ptr so we need to marshal.
            // this is a small price to pay for the extra utility of boost::promise over
            // std::promise
            if (error) {
                try {
                    std::rethrow_exception(error);
                }
                catch (...) {
                    shared_state->promise.set_exception(boost::current_exception());
                }
            } else {
                shared_state->promise.set_value();
            }
        };
    }


    struct result_type {
        auto get() -> boost::future<void> { return shared_state->promise.get_future(); }

        std::shared_ptr<shared_state_type> shared_state;
    };

    std::shared_ptr<shared_state_type> shared_state = std::make_shared<shared_state_type>();
    result_type result{shared_state};

};

/*
 * Provides the logic of a task manager. Also notice that it maintains a boost::basic_thread_pool
 * The destructor of a basic_thread_pool will not complete until all tasks are complete. So our
 * program will not crash horribly at exit time.
 */
struct task_manager_service {

    /*
     * through this function, the service has full control over how it is created and destroyed.
     */

    static auto use() -> task_manager_service&
    {
        static task_manager_service me {};
        return me;
    }

    using impl_class = task_manager_impl;

    struct deleter {
        void operator()(impl_class *p) {
            service_->destroy(p);
        }

        task_manager_service *service_;
    };

    /*
     * defining impl_type in terms of a unique_ptr ensures that the handle will be
     * moveable but not copyable.
     * Had we used a shared_ptr, the handle would be copyable with shared semantics.
     * That can be useful too.
     */
    using impl_type = std::unique_ptr<impl_class, deleter>;

    auto construct() -> impl_type {
        return impl_type(new impl_class(),
                         deleter {this});
    }

    auto destroy(impl_class *impl) -> void {
        wait(*impl);
        delete impl;
    }

    template<class Job, class Handler>
    auto submit(impl_class &impl, Job &&job, Handler &&handler) {

        auto make_handler = make_async_handler<Handler>();


        auto async_handler = make_handler.wrap(std::forward<Handler>(handler));

        auto my_task = std::make_unique<task>();
        auto task_ptr = my_task.get();

        auto task_done = [
                this,
                task_id = task_ptr,
                &impl,
                async_handler
        ](auto error) {
            async_handler(error);
            this->remove_task(impl, task_id);
        };
        auto lock = impl.get_lock();
        impl.add_task(lock, std::move(my_task));
        launch(impl, task_ptr, std::forward<Job>(job), task_done);

        return make_handler.result.get();
    };

    template<class F, class Handler>
    auto launch(impl_class &, task *task_ptr, F &&f, Handler &&handler) -> void {
        this->thread_pool_.submit([f, handler] {
            auto error = std::exception_ptr();
            try {
                f();
            }
            catch (...) {
                error = std::current_exception();
            }
            handler(error);
        });
    }


    auto wait(impl_class &impl) -> void {
        impl.wait(impl.get_lock());
    }

    auto remove_task(impl_class &impl, task *task_id) -> void {
        impl.remove_task(impl.get_lock(), task_id);
    }


    boost::basic_thread_pool thread_pool_{std::thread::hardware_concurrency()};

};

/*
 * The task manage handle. Holds the task_manager implementation plus provides access to the
 * owning task_manager_service. In this case, the service is a global static object. In an io loop environment
 * for example, asio, the service would be owned by the io loop.
 */
struct task_manager {

    using service_type = task_manager_service;
    using impl_type = service_type::impl_type;
    using impl_class = decltype(*std::declval<impl_type>());

    task_manager()
            : service_(std::addressof(service_type::use()))
            , impl_(get_service().construct()) {}

    template<class Job, class Handler>
    auto submit(Job &&job, Handler &&handler) {
        return get_service().submit(get_impl(),
                                    std::forward<Job>(job),
                                    std::forward<Handler>(handler));
    }

    auto get_service() -> service_type & {
        return *service_;
    }

    auto get_impl() -> impl_class & {
        return *impl_;
    }

private:

    service_type* service_;
    impl_type impl_;
};


/*
 * helpful thread-safe emitter
 */
std::mutex thing_mutex;

template<class...Things>
void emit(Things &&...things) {
    auto lock = std::unique_lock<std::mutex>(thing_mutex);
    using expand = int[];
    void(expand{0,
                ((std::cout << things), 0)...
    });
    std::cout << std::endl;
}

using namespace std::literals;

int main() {
    task_manager mgr;

    // an example of using async callbacks to indicate completion and error
    mgr.submit([] {
                   emit("task 1 is doing something");
                   std::this_thread::sleep_for(1s);
                   emit("task 1 done");
               },
               [](auto err) {
                   if (not err) {
                       emit("task 1 completed");
                   } else {
                       emit("task 1 failed");
                   }
               });

    // an example of returning a future (see later)
    auto f = mgr.submit([] {
        emit("task 2 doing something");
        std::this_thread::sleep_for(1500ms);
        emit("task 2 is going to throw");
        throw std::runtime_error("here is an error");
    }, use_future);

    // an example of returning a future and then immediately using its continuation.
    // note that the continuation happens on the task_manager's thread pool
    mgr.submit([] {
                   emit("task 3 doing something");
                   std::this_thread::sleep_for(500ms);
                   emit("task 3 is done");
               },
               use_future)
            .then([](auto f) {
                try {
                    f.get();
                }
                catch (std::exception const &e) {
                    emit("task 3 threw an exception: ", e.what());
                }
            });

    // block on the future of the second example
    try {
        f.get();
    }
    catch (std::exception &e) {
        emit("task 2 threw: ", e.what());
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 std::vector 自动删除已完成的 future 的相关文章

  • 如何让 Swagger 插件在自托管服务堆栈中工作

    我已经用 github 上提供的示例重新提出了这个问题 并为任何想要自己运行代码的人提供了一个下拉框下载链接 Swagger 无法在自托管 ServiceStack 服务上工作 https stackoverflow com questio
  • 计算 Richtextbox 中所有单词的最有效方法是什么?

    我正在编写一个文本编辑器 需要提供实时字数统计 现在我正在使用这个扩展方法 public static int WordCount this string s s s TrimEnd if String IsNullOrEmpty s re
  • ClickOnce 应用程序错误:部署和应用程序没有匹配的安全区域

    我在 IE 中使用 FireFox 和 Chrome 的 ClickOnce 应用程序时遇到问题 它工作正常 异常的详细信息是 PLATFORM VERSION INFO Windows 6 1 7600 0 Win32NT Common
  • 在 C 中匹配二进制模式

    我目前正在开发一个 C 程序 需要解析一些定制的数据结构 幸运的是我知道它们是如何构造的 但是我不确定如何在 C 中实现我的解析器 每个结构的长度都是 32 位 并且每个结构都可以通过其二进制签名来识别 举个例子 有两个我感兴趣的特定结构
  • 使用 LINQ2SQL 在 ASP.NET MVC 中的各种模型存储库之间共享数据上下文

    我的应用程序中有 2 个存储库 每个存储库都有自己的数据上下文对象 最终结果是我尝试将从一个存储库检索到的对象附加到从另一个存储库检索到的对象 这会导致异常 Use 构造函数注入将 DataContext 注入每个存储库 public cl
  • 如何创建包含 IPv4 地址的文本框? [复制]

    这个问题在这里已经有答案了 如何制作一个这样的文本框 我想所有的用户都见过这个并且知道它的功能 您可以使用带有 Mask 的 MaskedTestBox000 000 000 000 欲了解更多信息 请参阅文档 http msdn micr
  • 获取两个工作日之间的天数差异

    这听起来很简单 但我不明白其中的意义 那么获取两次之间的天数的最简单方法是什么DayOfWeeks当第一个是起点时 如果下一个工作日较早 则应考虑在下周 The DayOfWeek 枚举 http 20 20 5B1 5D 3a 20htt
  • 使用接口有什么好处?

    使用接口有什么用 我听说它用来代替多重继承 并且还可以用它来完成数据隐藏 还有其他优点吗 哪些地方使用了接口 程序员如何识别需要该接口 有什么区别explicit interface implementation and implicit
  • 在 C 中初始化变量

    我知道有时如果你不初始化int 如果打印整数 您将得到一个随机数 但将所有内容初始化为零似乎有点愚蠢 我问这个问题是因为我正在评论我的 C 项目 而且我对缩进非常直接 并且它可以完全编译 90 90 谢谢 Stackoverflow 但我想
  • 如何检测表单的任何控件的变化?

    如何检测 C 中表单的任何控件的更改 由于我在一个表单上有许多控件 并且如果表单中的任何控件值发生更改 我需要禁用按钮 我正在寻找一些内置函数 事件处理程序 属性 并且不想为此创建自定义函数 不 我不知道任何时候都会触发任何事件any控制表
  • C#:帮助理解 UML 类图中的 <>

    我目前正在做一个项目 我们必须从 UML 图编写代码 我了解 UML 类图的剖析 但我无法理解什么 lt
  • 外键与独立关系 - Entity Framework 5 有改进吗?

    我读过了several http www ladislavmrnka com 2011 05 foreign key vs independent associations in ef 4 文章和问题 https stackoverflow
  • CMake 无法确定目标的链接器语言

    首先 我查看了this https stackoverflow com questions 11801186 cmake unable to determine linker language with c发帖并找不到解决我的问题的方法 我
  • 如何在非控制台应用程序中查看 cout 输出?

    输出到调试窗口似乎相当繁琐 我在哪里可以找到cout如果我正在编写非控制台信息 则输出 Like double i a b cout lt lt b lt lt endl I want to check out whether b is z
  • 使用 %d 打印 unsigned long long

    为什么我打印以下内容时得到 1 unsigned long long int largestIntegerInC 18446744073709551615LL printf largestIntegerInC d n largestInte
  • 不同类型指针之间的减法[重复]

    这个问题在这里已经有答案了 我试图找到两个变量之间的内存距离 具体来说 我需要找到 char 数组和 int 之间的距离 char data 5 int a 0 printf p n p n data 5 a long int distan
  • 调用堆栈中的“外部代码”是什么意思?

    我在 Visual Studio 中调用一个方法 并尝试通过检查调用堆栈来调试它 其中一些行标记为 外部代码 这到底是什么意思 方法来自 dll已被处决 外部代码 意味着该dll没有可用的调试信息 你能做的就是在Call Stack窗口中单
  • Oracle Data Provider for .NET 不支持 Oracle 19.0.48.0.0

    我们刚刚升级到 Oracle 19c 19 3 0 所有应用程序都停止工作并出现以下错误消息 Oracle Data Provider for NET 不支持 Oracle 19 0 48 0 0 我将 Oracle ManagedData
  • 使用 .NET Process.Start 运行时挂起进程 - 出了什么问题?

    我在 svn exe 周围编写了一个快速而肮脏的包装器来检索一些内容并对其执行某些操作 但对于某些输入 它偶尔会重复挂起并且无法完成 例如 一个调用是 svn list svn list http myserver 84 svn Docum
  • 如何从 ODBC 连接获取可用表的列表?

    在 Excel 中 我可以转到 数据 gt 导入外部数据 gt 导入数据 然后选择要使用的数据源 然后在提供登录信息后 它会给我一个表格列表 我想知道如何使用 C 以编程方式获取该列表 您正在查询什么类型的数据源 SQL 服务器 使用权 看

随机推荐