1. ThreadPool.hpp
#pragma once
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
namespace haifan
{
#define MAX_THREAD_NUM 256
class ThreadPool
{
using Task = std::function<void()>;
// 线程池
std::vector<std::thread> pool;
// 任务队列
std::queue<Task> tasks;
// 同步
std::mutex m_lock;
// 条件阻塞
std::condition_variable cv_task;
// 是否关闭提交
std::atomic<bool> stoped;
//空闲线程数量
std::atomic<int> idlThrNum;
public:
inline ThreadPool(unsigned short size = 4) :stoped{ false }
{
idlThrNum = size < 1 ? 1 : size;
for (size = 0; size < idlThrNum; ++size)
{ //初始化线程数量
pool.emplace_back(
[this]
{ // 工作线程函数
while(!this->stoped)
{
std::function<void()> task;
{ // 获取一个待执行的 task
std::unique_lock<std::mutex> lock{ this->m_lock };// unique_lock 相比 lock_guard 的好处是:可以随时 unlock() 和 lock()
this->cv_task.wait(lock,
[this] {
return this->stoped.load() || !this->tasks.empty();
}
); // wait 直到有 task
if (this->stoped && this->tasks.empty())
return;
task = std::move(this->tasks.front()); // 取一个 task
this->tasks.pop();
}
idlThrNum--;
task();
idlThrNum++;
}
}
);
}
}
inline ~ThreadPool()
{
stoped.store(true);
cv_task.notify_all(); // 唤醒所有线程执行
for (std::thread& thread : pool) {
//thread.detach(); // 让线程“自生自灭”
if(thread.joinable())
thread.join(); // 等待任务结束, 前提:线程一定会执行完
}
}
public:
// 提交一个任务
// 调用.get()获取返回值会等待任务执行完,获取返回值
// 有两种方法可以实现调用类成员,
// 一种是使用 bind: .commit(std::bind(&Dog::sayHello, &dog));
// 一种是用 mem_fn: .commit(std::mem_fn(&Dog::sayHello), &dog)
template<class F, class... Args>
auto push_task(F&& f, Args&&... args) ->std::future<decltype(f(args...))>
{
if (stoped.load()) // stop == true ??
throw std::runtime_error("commit on ThreadPool is stopped.");
using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type, 函数 f 的返回值类型
auto task = std::make_shared<std::packaged_task<RetType()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
); // wtf !
std::future<RetType> future = task->get_future();
{ // 添加任务到队列
std::lock_guard<std::mutex> lock{ m_lock };//对当前块的语句加锁 lock_guard 是 mutex 的 stack 封装类,构造的时候 lock(),析构的时候 unlock()
tasks.emplace(
[task]()
{ // push(Task{...})
(*task)();
}
);
}
cv_task.notify_one(); // 唤醒一个线程执行
return future;
}
//空闲线程数量
int idlCount() { return idlThrNum; }
};
}
#endif
2. main.cpp
#include "mysql_connection.h"
#include <string>
#include <iostream>
#include <sstream>
#include <cppconn/driver.h>
#include <cppconn/exception.h>
#include <cppconn/resultset.h>
#include <cppconn/statement.h>
#include "threadpool.hpp"
#include <math.h>
using namespace std;
using namespace haifan;
int read_dat_count() {
int count = 0;
try
{
sql::Driver *driver;
sql::Connection *con;
sql::Statement *stmt;
sql::ResultSet *res;
/* Create a connection */
driver = get_driver_instance();
con = driver->connect("tcp://10.0.0.31:3306", "root", "haifan123!@#");
/* Connect to the MySQL test database */
con->setSchema("hf_face_sys");
stmt = con->createStatement();
res = stmt->executeQuery("select count(*) from faces_bigdata;");
while (res->next())
{
//cout << "\t... MySQL replies: ";
/* Access column data by alias or column name */
//cout << res->getString(1) << endl;
count = res->getInt64(1);
}
std::cout << "this_thread id = " << std::this_thread::get_id() << std::endl;
delete res;
delete stmt;
delete con;
}
catch (sql::SQLException &e)
{
cout << "# ERR: SQLException in " << __FILE__;
cout << "(" << __FUNCTION__ << ") on line " << __LINE__ << endl;
cout << "# ERR: " << e.what();
cout << " (MySQL error code: " << e.getErrorCode();
cout << ", SQLState: " << e.getSQLState() << " )" << endl;
}
return count;
}
int read_sql_dat(int start, int limit) {
try
{
sql::Driver *driver;
sql::Connection *con;
sql::Statement *stmt;
sql::ResultSet *res;
/* Create a connection */
driver = get_driver_instance();
con = driver->connect("tcp://10.0.0.31:3306", "root", "haifan123!@#");
/* Connect to the MySQL test database */
con->setSchema("hf_face_sys");
std::cout << "start " << start <<" -> " << limit << std::endl;
stmt = con->createStatement();
string sql = "select * from faces_bigdata limit " + to_string(start) + "," + to_string(limit);
res = stmt->executeQuery(sql.c_str());
bool first = true;
string last;
while (res->next())
{
if(first) {
cout << "first id = " << res->getString(1) << endl;
first = false;
}
last = res->getString(1);
}
cout << "last id = " << last << endl;
std::cout << "this_thread id = " << std::this_thread::get_id() << std::endl;
delete res;
delete stmt;
delete con;
}
catch (sql::SQLException &e)
{
cout << "# ERR: SQLException in " << __FILE__;
cout << "(" << __FUNCTION__ << ") on line " << __LINE__ << endl;
cout << "# ERR: " << e.what();
cout << " (MySQL error code: " << e.getErrorCode();
cout << ", SQLState: " << e.getSQLState() << " )" << endl;
}
std::ostringstream oss;
oss << std::this_thread::get_id();
std::string stid = oss.str();
unsigned long long tid = std::stoull(stid);
return tid;
}
int main(void)
{
ThreadPool *executor = new ThreadPool(4);
int count = read_dat_count();
std::cout << "count = " << count << std::endl;
int limit = ceil(count / 4);
std::cout << "limit = " << limit << std::endl;
auto res1 = executor->push_task(read_sql_dat, 0, limit);
auto res2 = executor->push_task(read_sql_dat, limit, limit);
auto res3 = executor->push_task(read_sql_dat, limit * 2, limit);
auto res4 = executor->push_task(read_sql_dat, limit * 3, limit);
int r1 = res1.get();
int r2 = res2.get();
int r3 = res3.get();
int r4 = res4.get();
std::cout << "res1 = " << r1 << std::endl;
std::cout << "res2 = " << r2 << std::endl;
std::cout << "res3 = " << r3 << std::endl;
std::cout << "res4 = " << r4 << std::endl;
return EXIT_SUCCESS;
}
3. CMakeLists.txt
project(redis-test)
cmake_minimum_required(VERSION 2.6.0 FATAL_ERROR)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set (CMAKE_EXE_LINKER_FLAGS_RELEASE "${CMAKE_EXE_LINKER_FLAGS_RELEASE} ")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -std=c++11 -Wall ")
set ( CMAKE_EXE_LINKER_FLAGS_DEBUG "${CMAKE_EXE_LINKER_FLAGS_DEBUG} -fsanitize=address -fsanitize=undefined ")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -std=c++11 -Wall -O0 -fno-omit-frame-pointer -fsanitize=address -fsanitize=undefined -D__FORTIFY_SOURCE=2 -rdynamic -DDEBUG")
include_directories("~/haifan/zhouyong/c++/redis-test/")
include_directories("~/haifan/zhouyong/c++/redis-test/redis-plus-plus/src/")
include_directories("~/haifan/zhouyong/c++/redis-test/redis-plus-plus/src/sw/redis++/no_tls")
link_directories("/home/haifan/haifan/zhouyong/c++/redis-test/redis-plus-plus/build/")
link_directories("/home/haifan/haifan/zhouyong/c++/redis-test/hiredis/")
find_package(OpenCV REQUIRED)
include(CMakeToolsHelpers OPTIONAL)
include_directories(${OpenCV_INCLUDE_DIRS})
add_executable(multi-mysql-test main.cpp)
# add_executable(hfTest test.cpp)
# add_executable(hfTest2 test2.cpp)
target_link_libraries(multi-mysql-test glog cryptopp ${OpenCV_LIBS} uuid mysqlcppconn jsoncpp pthread redis++ hiredis)