Hi all, I have a query about multithreading. What I would like to do is, at the start of my main update() function, start a couple of threads in parallel, once they are all complete carry on with my main update function.

Code:
void update() {
   thread1->update();  // fluid solver
   thread2->update();  // particle system
   thread3->update();  // camera1 optical flow
   thread4->update();  // camera2 optical flow
   // all of the above threads have their own data and work independantly

   etc.
   .
   .. wait till all the above threads are done.
   .. once they are done, feed the data from each of them into other ones
   .. e.g. fluid velocity info is added to particle systems reedy for next frame's update
   .. e.g. camera optical flow data is fed into fluid solver, ready for next frame's update
   
   render all
}
I am having trouble getting this working the way I want. I don't want my threads to loop continuously independent of my main update loop, I want them all to start at the beginning of my update loop, and then once they are all finished, feed the data around, then render, then repeat.

I tried creating a thread every update loop which runs only once (no while() in the function) - it works and behaves fine for a few minutes, then the threads just stop being created - no crash or hang, just no updates from threads. I also tried using an infinte while() but with a flag that determined whether the code ran that frame or not, and only set that flag at the beginning of the main update loop (code for this below) - that one is very stable, but behaviour was weird, I think the thread updates weren't being called when I wanted them to.

I've read a lot of documentation on multithreading in general and all the pthread commands. I understand the fundamentals of protecting data and all that.. but I'm just lacking a lot of experience and strategy .. so any suggestions welcome!


This is my thread class I'm using which has the infinite loop in the threaded function, but only runs the update function if the flag is set. I initialize the flag at the beginning of each main update with thread1->runOnce(); thread2->runOnce(); etc.... but it doesn't run smoothly.

Code:
#include "ofxThread.h"

class MSAThread : public ofxThread {

	bool bHasRunThisFrame;
	bool bAutoLoop;
	int interval;

public:

	// create the thread, if autoLoop is true, start running straight away and loop, 
	// otherwise wait for the runOnce command
	void start(int frameRate = 30, bool initAutoLoop = false){	
		interval = 1000/frameRate;
		bAutoLoop = initAutoLoop;
		if(!bAutoLoop) bHasRunThisFrame = true;		// don't run straight away
		startThread(true, false);   // blocking, verbose
	}
	
	// run the update function once in another thread
	void runOnce() {					
		if(lock()) {
			bAutoLoop = false; 
			bHasRunThisFrame = false;
			unlock();
		}
	}

	// run the update function in another thread
	void threadedFunction() {						
		while(isThreadRunning()) {
			if( lock() ){
				if(bAutoLoop || !bHasRunThisFrame) {
					update();
					bHasRunThisFrame = true;
					unlock();
					ofSleepMillis(interval); 
				} else {
					unlock();
				}
			}
		}
	}
	
	// main thread waits for the update function to finish 
	void waitForFinish() { if(lock()) { unlock(); } }	
	
	// this is the function that gets overvidden
	virtual void update() = 0;							
};
and it's based on this OpenFrameworks class:
Code:
class ofxThread{

	public:
		ofxThread();
		virtual ~ofxThread();
		bool isThreadRunning();
		void startThread(bool _blocking = true, bool _verbose = true);
		bool lock();
		bool unlock();
		void stopThread();

	protected:

		//-------------------------------------------------
		//you need to overide this with the function you want to thread
		virtual void threadedFunction(){
			if(verbose)printf("ofxThread: overide threadedFunction with your own\n");
		}

		//-------------------------------------------------

		#ifdef TARGET_WIN32
			static unsigned int __stdcall thread(void * objPtr){
				ofxThread* me	= (ofxThread*)objPtr;
				me->threadedFunction();
				return 0;
			}

		#else
			static void * thread(void * objPtr){
				ofxThread* me	= (ofxThread*)objPtr;
				me->threadedFunction();
				return 0;
			}
		#endif


	#ifdef TARGET_WIN32
			HANDLE            myThread;
			CRITICAL_SECTION  critSec;  	//same as a mutex
	#else
			pthread_t        myThread;
			pthread_mutex_t  myMutex;
	#endif

	bool threadRunning;
	bool blocking;
	bool verbose;
};


//-------------------------------------------------
ofxThread::ofxThread(){
	threadRunning = false;
}

//-------------------------------------------------
ofxThread::~ofxThread(){
	stopThread();
}

//-------------------------------------------------
bool ofxThread::isThreadRunning(){
	//should be thread safe - no writing of vars here
	return threadRunning;
}

//-------------------------------------------------
void ofxThread::startThread(bool _blocking, bool _verbose){

	//have to put this here because the thread can be running
	//before the call to create it returns
	threadRunning	= true;

	#ifdef TARGET_WIN32
		InitializeCriticalSection(&critSec);
		myThread = (HANDLE)_beginthreadex(NULL, 0, this->thread,  (void *)this, 0, NULL);
	#else
		pthread_mutex_init(&myMutex, NULL);
		pthread_create(&myThread, NULL, thread, (void *)this);
	#endif

	blocking		=	_blocking;
	verbose			= _verbose;
}

//-------------------------------------------------
//returns false if it can't lock
bool ofxThread::lock(){
	if(!threadRunning){
		if(verbose)printf("ofxThread: need to call startThread first\n");
		return false;
	}

	#ifdef TARGET_WIN32
		if(blocking)EnterCriticalSection(&critSec);
		else {
			if(!TryEnterCriticalSection(&critSec)){
				if(verbose)printf("ofxThread: mutext is busy \n");
				return false;
			}
		}
		if(verbose)printf("ofxThread: we are in -- mutext is now locked \n");
	#else

		if(blocking){
			if(verbose)printf("ofxThread: waiting till mutext is unlocked\n");
			pthread_mutex_lock(&myMutex);
			if(verbose)printf("ofxThread: we are in -- mutext is now locked \n");
		}else{
			int value = pthread_mutex_trylock(&myMutex);
			if(value == 0){
				if(verbose)printf("ofxThread: we are in -- mutext is now locked \n");
			}
			else{
				if(verbose)printf("ofxThread: mutext is busy - already locked\n");
				return false;
			}
		}
	#endif

	return true;
}

//-------------------------------------------------
bool ofxThread::unlock(){

	if(!threadRunning){
		if(verbose)printf("ofxThread: need to call startThread first\n");
		return false;
	}

	#ifdef TARGET_WIN32
		LeaveCriticalSection(&critSec);
	#else
		pthread_mutex_unlock(&myMutex);
	#endif

	if(verbose)printf("ofxThread: we are out -- mutext is now unlocked \n");

	return true;
}

//-------------------------------------------------
void ofxThread::stopThread(){
	if(threadRunning){
		#ifdef TARGET_WIN32
			CloseHandle(myThread);
		#else
			pthread_mutex_destroy(&myMutex);
			pthread_detach(myThread);
		#endif
		if(verbose)printf("ofxThread: thread stopped\n");
		threadRunning = false;
	}else{
		if(verbose)printf("ofxThread: thread already stopped\n");
	}
}