I'm trying to use a Boost thread group to execute various sql statements on different relational databases.
The code below does not perform the update as expected, yet when the function is run outside the thread pool, it executes as expected.
Code:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <SQLAPI/include/SQLAPI.h>
class thread_pool {
private:
boost::asio::io_service ios;
boost::asio::io_service::work work_ios;
boost::thread_group thread_grp;
unsigned short threads_free;
boost::mutex mx;
public:
// Constructor.
thread_pool( int pool_size = 0 ) : work_ios( ios ), threads_free( pool_size ) {
if(pool_size>0)
pool_size = boost::thread::hardware_concurrency()*2; // default to double number of cores
for ( int i = 0; i < pool_size; ++i )
thread_grp.create_thread( boost::bind( &boost::asio::io_service::run, &ios ) );
}
~thread_pool() {
ios.stop(); // Ensure all threads in ios::run() are stopped
try { thread_grp.join_all(); }
catch ( const std::exception& ) {}
}
// if thread is free, add a job
template < typename Job >
void run_job( Job job ) {
boost::unique_lock< boost::mutex > lock( mx );
if ( 0 == threads_free ) return; // exit if no available threads
--threads_free; // Decrement thread count
ios.dispatch(boost::bind( &thread_pool::wrapper, this, boost::function< void() >( job ) ));
// ios.post( boost::bind( &thread_pool::wrapper, this, boost::function< void() >( job ) ) );
}
private:
// Called from run_job
// Wrap the job and ensure thread free count is incremented safely
void wrapper( boost::function< void() > job ) {
try { job(); } // executes the job
catch ( const std::exception& ) {}
// increment available threads, once job finished
boost::unique_lock< boost::mutex > lock( mx );
++threads_free;
}
};
long exec_sql(const std::string & sql) {
SAConnection con; // connection object
long ret{0};
try {
con.Connect("localhost,5432@ft_node", "bluefrog", "bluefrog", SA_PostgreSQL_Client);
con.setAutoCommit(SA_AutoCommitOff);
con.setIsolationLevel(SA_Serializable);
SACommand cmd1(&con, sql.c_str());
cmd1.Execute();
ret = cmd1.RowsAffected();
con.Commit();
}
catch(SAException &x) {
std::cout << (const char*)x.ErrText() << "\n";
}
con.Disconnect();
return ret;
}
int main () {
// long ret = exec_sql("update test1 set y = 'Yellow' where x in(2,3,4)");
// std::cout << "return " << ret << "\n";
thread_pool tp{4};
tp.run_job( boost::bind( exec_sql, std::string("update test1 set y = 'Magenta' where x in(2,3,4)") ) );
return 0;
};
Can anybody suggest what the problem might be ?