A Thread that doesn't hold long enough
I've got a process called Arrivals, this process has mean arrival time of 8 seconds.
I've created an ExponentialStream like this,
Code:
#ifndef _ARRIVALS_H_
#define _ARRIVALS_H_
//#ifndef PROCESS_H_
# include "Process.h"
//#endif
//#ifndef RANDOM_H_
# include "SimuDLL\Random.h"
//#endif
class Arrivals : public Process {
public:
Arrivals (double);
virtual ~Arrivals ();
virtual void Body ();
private:
ExponentialStream* InterArrivalTime;
};
#endif
Code:
/*
* Copyright (C) 1994-1998,
*
* Department of Computing Science,
* The University,
* Newcastle upon Tyne,
* UK.
*/
#ifndef ARRIVALS_H_
# include "Arrivals.h"
#endif
#ifndef JOB_H_
# include "Job.h"
#endif
#include "CMinMax.h"
Arrivals::Arrivals (double mean)
: InterArrivalTime(new ExponentialStream(mean))
{
}
Arrivals::~Arrivals () { delete InterArrivalTime; }
void Arrivals::Body ()
{
for (;;)
{
double arrivalTime = (*InterArrivalTime)();
// Hold a number of intervals, before a new lorry arrives
Hold(arrivalTime);
CMinMax<int> lorryRand(1,3);
int lorryId = lorryRand.GetRandomNumber();
Job* work = new Job(lorryId);
}
}
I've triggered this process in the main routine of the program
Code:
Sim::Sim()
{
m_Arrivals = new Arrivals(8);
#ifndef NO_RESOURCE
Resources::ref(m_Arrivals);
#endif
m_Arrivals->Activate();
m_Arrivals->Resume();
Scheduler::scheduler().Resume();
}
But the arrivals of different events came so fast so that I even can't identify which is which.
(They actually came in under 0.1 s, not around 8 seconds), I conclude that the Suspend method doesn't even hold the thread. I wonder why.... I don't know this code would be where the core problem is, I will post more later.
Thanks
Jack
Code:
#ifndef __GNUG__
static double SimulatedTime = 0.0;
#else
double SimulatedTime = 0.0;
#endif
#ifndef TESTQUEUE
static Queue_Type ReadyQueue; // Queue_Type is replaced by cpp
#else
Queue_Type ReadyQueue;
#endif
static Mutex* _theMutex = Mutex::create();
Scheduler* Scheduler::theScheduler = (Scheduler*) 0;
Boolean Scheduler::schedulerRunning = FALSE;
Process* Process::Current = (Process*) 0;
const double Process::Never = -1; // Process will never awaken.
/*
* Note: unlike in SIMULA, an active process is removed from the simulation
* queue prior to being activated.
*/
//
// Class Scheduler
//
// scheduler is just an object, with no associated thread.
Scheduler::Scheduler ()
{
}
Scheduler::~Scheduler ()
{
}
/*
* This routine resets the simulation time to zero and removes all
* entries from the scheduler queue (as their times may no longer
* be valid). Such entries are suspended as though Cancel had been
* called on them by a "user" process.
*
* Note: if this is to be used to reset the simulation environment
* for another "run" then some means of re-initializing these queue
* entries will be required. This is up to the user to provide and
* invoke prior to their being used again.
*/
void Scheduler::reset () const
{
Process* tmp = (Process*) 0;
do
{
tmp = ReadyQueue.Remove();
if (tmp)
{
tmp->Cancel();
tmp->reset(); // call user-definable reset routine.
}
} while (tmp);
SimulatedTime = 0.0;
}
double Scheduler::CurrentTime () const { return SimulatedTime; }
void Scheduler::print (std::ostream& strm)
{
strm << "Scheduler queue:\n" << std::endl;
ReadyQueue.print(strm);
strm << "End of scheduler queue." << std::endl;
}
//
// Class Process
//
/*
* Before this process is deleted, make sure it
* is removed from the queue, or we could have
* problems with pointer dereferencing!
*/
Process::~Process ()
{
/*
* We don't call Cancel of terminate here since they will
* attempt to suspend this process if it is running, and
* we do not want that to occur - garbage could quickly
* build up. So, we let the destructor run to completion
* which will implicitly suspend the process anyway, and
* let the thread specific destructor then decide whether
* it needs to do anything specific (e.g., reactivate the
* scheduler) before it finishes.
*/
if (!Terminated)
{
Terminated = TRUE;
Passivated = TRUE;
unschedule(); // remove from scheduler queue
wakeuptime = Process::Never;
if (this == Process::Current)
{
schedule();
_theMutex->unlock();
}
}
}
double Process::CurrentTime () { return SimulatedTime; }
void Process::set_evtime (double time)
{
if (!idle()) // error if we are not scheduled for activation
{
if (time >= Process::CurrentTime())
wakeuptime = time;
else
error_stream << WARNING
<< "Process::set_evtime - time " << time
<< " invalid" << std::endl;
}
else
error_stream << WARNING
<< "Process::set_evtime called for idle process." << std::endl;
}
// return process to be activated after this process.
const Process* Process::next_ev () const
{
return ((!idle()) ? ReadyQueue.getNext(this) : (Process*) 0);
}
/*
* These routines are slightly different to the SIMULA
* equivalents in that a process must be given, and that
* process must be scheduled. Complete compatibility
* may be provided in future releases.
*/
void Process::ActivateBefore (Process &p)
{
// No op if already scheduled
if (Terminated || !idle()) return;
Passivated = FALSE;
if (ReadyQueue.InsertBefore(*this, p))
wakeuptime = p.wakeuptime;
else
error_stream << WARNING << "ActivateBefore failed because 'before' process is not scheduled" << std::endl;
}
void Process::ActivateAfter (Process &p)
{
// No op if already scheduled
if (Terminated || !idle()) return;
Passivated = FALSE;
if (ReadyQueue.InsertAfter(*this, p))
wakeuptime = p.wakeuptime;
else
error_stream << WARNING << "ActivateAfter failed because 'after' process is not scheduled" << std::endl;
}
/*
* These routines differ from their SIMULA counterparts in
* that a negative AtTime is considered an error, and will
* not cause the process to be activated at the current time.
* This may change in a future release, when we may provide
* a "SIMULA compatibility" mode.
*/
void Process::ActivateAt (double AtTime, Boolean prior)
{
// No op if already scheduled
if ((AtTime < Process::CurrentTime()) || Terminated || !idle()) return;
Passivated = FALSE;
wakeuptime = AtTime;
ReadyQueue.Insert(*this, prior);
}
void Process::ActivateDelay (double Delay, Boolean prior)
{
// No op if already scheduled
if (!checkTime(Delay) || Terminated || !idle()) return;
Passivated = FALSE;
wakeuptime = SimulatedTime + Delay;
ReadyQueue.Insert(*this, prior);
}
void Process::Activate ()
{
// No op if already scheduled
if (Terminated || !idle()) return;
Passivated = FALSE;
wakeuptime = CurrentTime();
ReadyQueue.Insert(*this, TRUE);
}
/*
* Similarly, there are four ways to reactivate
* Note that if a process is already scheduled, the reactivate
* will simply re-schedule the process.
*/
void Process::ReActivateBefore (Process &p)
{
if (Terminated) return;
unschedule();
ActivateBefore(p);
if (Process::Current == this)
Suspend();
}
void Process::ReActivateAfter (Process &p)
{
if (Terminated) return;
unschedule();
ActivateAfter(p);
if (Process::Current == this)
Suspend();
}
void Process::ReActivateAt (double AtTime, Boolean prior)
{
if (Terminated) return;
unschedule();
ActivateAt(AtTime, prior);
if (Process::Current == this)
Suspend();
}
void Process::ReActivateDelay (double Delay, Boolean prior)
{
if (Terminated) return;
unschedule();
ActivateDelay(Delay, prior);
if (Process::Current == this)
Suspend();
}
void Process::ReActivate ()
{
if (Terminated) return;
unschedule();
Activate();
if (Process::Current == this)
Suspend();
}
Boolean Process::schedule ()
{
if (Scheduler::simulationStarted())
{
Boolean doSuspend = TRUE;
Process::Current = ReadyQueue.Remove();
if (Process::Current == (Process*) 0) // all done
{
std::cout << "Scheduler queue is empty. Simulation ending" << std::endl;
Thread::Exit();
}
if (Process::Current->evtime() < 0)
{
error_stream << WARNING << "Scheduler Error: Process WakeupTime "
<< Process::Current->evtime() << " invalid.\n";
}
else
SimulatedTime = Process::Current->evtime();
#ifdef DEBUG
debug_stream << FUNCTIONS << FAC_SCHEDULER << VIS_PUBLIC;
debug_stream << "Simulated time is now " << SimulatedTime << std::endl;
#endif
if (Process::Current != this)
Process::Current->Resume();
else
doSuspend = FALSE;
return doSuspend;
}
else
return FALSE;
}
// only called if process is running or on queue to be run
void Process::unschedule ()
{
if (!idle())
{
if (this != Process::Current)
(void) ReadyQueue.Remove(this); // remove from queue
wakeuptime = Process::Never;
Passivated = TRUE;
}
}
// suspend current process for simulated time t
void Process::Hold (double t)
{
if (checkTime(t))
{
if ((this == Process::Current) || (!Process::Current))
{
wakeuptime = Process::Never;
ActivateDelay(t);
Suspend();
}
else
error_stream << WARNING
<< "Process::Hold - can only be applied to active object."
<< std::endl;
}
else
error_stream << WARNING << "Process::Hold - time " << t
<< " invalid." << std::endl;
}
/*
* We add these routines because we may need to do some
* Process specific manipulations prior to calling the
* thread specific implementations.
*/
void Process::Suspend ()
{
if (schedule())
{
_theMutex->unlock();
Thread::Suspend();
_theMutex->lock();
}
}
Re: A Thread that doesn't hold long enough
nt_thread.cc
Code:
/*
* Copyright (C) 1994-1997, 1998,
*
* Department of Computing Science,
* The University,
* Newcastle upon Tyne,
* UK.
*
* $Id: nt_thread.cc,v 1.14 1998/08/28 14:19:43 nmcl Exp $
*/
#ifndef NT_CONFIGURE_H_
# include "Nt_Configure.h"
#endif
#include <iostream>
#ifndef RESOURCE_H_
# include "Resource.h"
#endif
#ifndef ERROR_H_
# include "Error.h"
#endif
#ifndef NTTHREAD_H_
# include "nt_thread.h"
#endif
static const int MaxPriority = THREAD_PRIORITY_NORMAL;
Thread* ThreadData::mainThread = 0;
ThreadData::ThreadData ()
: _prio(MaxPriority+1),
sem(CreateSemaphore(NULL, 0, 1, 0)),
thrHandle(0),
mid(0),
dead(FALSE)
{
}
ThreadData::~ThreadData ()
{
}
DWORD ThreadData::Execute (LPDWORD p1)
{
Thread* _p1 = (Thread*) p1;
WaitForSingleObject(_p1->_data->sem, INFINITE);
if (!_p1->_data->dead)
{
_p1->Body();
_p1->_data->dead = TRUE;
}
return 0;
}
Thread::Thread (Boolean createThread)
: thread_key(-1),
_data(new ThreadData),
next(0),
prev(0)
{
if (createThread)
{
LPVOID p1 = (LPVOID) this;
_data->thrHandle = CreateThread(NULL, 0,
(LPTHREAD_START_ROUTINE) ThreadData::Execute, p1,
0, &_data->mid);
SetThreadPriority(_data->thrHandle, _data->_prio);
thread_key = (long) _data->mid;
}
else
{
_data->mid = GetCurrentThreadId();
thread_key = _data->mid;
ThreadData::mainThread = this;
}
Insert((Thread*) 0, this);
}
Thread::Thread (unsigned long stackSize)
: thread_key(-1),
_data(new ThreadData),
next(0),
prev(0)
{
LPVOID p1 = (LPVOID) this;
_data->thrHandle = CreateThread(NULL, stackSize,
(LPTHREAD_START_ROUTINE) ThreadData::Execute, p1,
0, &_data->mid);
SetThreadPriority(_data->thrHandle, _data->_prio);
thread_key = (long) _data->mid;
Insert((Thread*) 0, this);
}
// tidy things up before we terminate thread
Thread::~Thread ()
{
Remove(this);
terminateThread();
delete _data;
}
void Thread::terminateThread ()
{
/*
* If one thread terminates another then we must switch contexts
* to the terminating thread for it to tidy itself up. Then that
* thread must switch back to us.
*/
_data->dead = TRUE;
if (_data->mid == GetCurrentThreadId())
{
ReleaseSemaphore(_data->sem, 1, NULL);
ExitThread(0);
}
else
{
Thread::Resume();
WaitForSingleObject(_data->sem, INFINITE);
}
}
long Thread::Current_Thread () const
{
return GetCurrentThreadId();
}
void Thread::Suspend ()
{
Thread* ptr = (Thread*) Thread::Self();
if ((ptr == ThreadData::mainThread) && (ptr != this))
return;
if (ptr == this)
{
WaitForSingleObject(_data->sem, INFINITE);
if (_data->dead)
terminateThread();
}
}
void Thread::Resume ()
{
Thread* ptr = (Thread*) Thread::Self();
ReleaseSemaphore(_data->sem, 1, NULL);
}
std::ostream& Thread::print (std::ostream& strm) const
{
strm << "Thread type is NT threads.\n";
strm << "\nThread key: " << thread_key << "\n";
return strm;
}
//
// Getting the main thread into the thread list...
//
class NT_Main_Thread : public Thread
{
public:
NT_Main_Thread ();
~NT_Main_Thread ();
void Body ();
static NT_Main_Thread* mainThread;
};
NT_Main_Thread* NT_Main_Thread::mainThread = 0;
NT_Main_Thread::NT_Main_Thread ()
: Thread((Boolean) FALSE)
{
}
NT_Main_Thread::~NT_Main_Thread () {}
void NT_Main_Thread::Body () {}
void Thread::Initialize ()
{
if (!_initialized)
{
_initialized = TRUE;
SetThreadPriority(GetCurrentThread(), MaxPriority+1);
NT_Main_Thread::mainThread = new NT_Main_Thread;
}
}
void Thread::Exit (int retValue)
{
exit(retValue);
}
void Thread::mainResume ()
{
NT_Main_Thread::mainThread->Resume();
}
/*
* The mutex.
*/
NT_Mutex::NT_Mutex ()
{
_theLock = CreateMutex(NULL, 0, NULL);
}
NT_Mutex::~NT_Mutex ()
{
CloseHandle(_theLock);
}
Boolean NT_Mutex::lock ()
{
if (WaitForSingleObject(_theLock, INFINITE) != WAIT_FAILED)
return TRUE;
else
return FALSE;
}
Boolean NT_Mutex::unlock ()
{
if (ReleaseMutex(_theLock) == 0)
return TRUE;
else
return FALSE;
}
/*
* Now the Mutex create method.
*/
Mutex* Mutex::create ()
{
return new NT_Mutex;
}
Re: A Thread that doesn't hold long enough
Quote:
Originally Posted by
lucky6969b
But the arrivals of different events came so fast so that I even can't identify which is which.
(They actually came in under 0.1 s, not around 8 seconds), I conclude that the Suspend method doesn't even hold the thread. I wonder why.... I don't know this code would be where the core problem is, I will post more later.
Thanks
Jack
Why not add some logging and figure it out for sure. Btw, are you familiar with the concept of RAII with respect to thread synchronization?
Re: A Thread that doesn't hold long enough
Hello Arjay,
I suddenly realized that I need to open up a main thread that derives from Process.
But I need to suspend the main thread in order for the child threads to execute.
So my new question is how do I suspend the main thread of other child threads without freezing the program?
Code:
Sim::Sim() {
Thread::Initialize(); // Initialize the threads package.
// Create a main thread, m_Arrival is a child thread which has no parent
// so that Process::Current == this because the m_Arrival thread == main thread
m_Arrivals = new Arrival_MainThread(false);
// Main Thread has to be suspended in order for child threads to execute
// but if I suspend the main thread here, it will freeze the program,
// because the main thread never terminates
m_Arrivals->Await();
}
Update:
Should I make this a daemon process?
How do I detach the child thread from the main thread in win32?
Thanks
Jack
Re: A Thread that doesn't hold long enough
Quote:
Originally Posted by
lucky6969b
Should I make this a daemon process?
How do I detach the child thread from the main thread in win32?
These are not Windows concepts as far as I know. I've heard about this on Linux, but not on Windows.
At any rate, threads should execute independent of other threads. So your design should generally avoid having to do anything in one thread that impacts other threads (the exception to this is synchronizing shared data).
With this in mind, what you wrote below puzzles me.
Quote:
But I need to suspend the main thread in order for the child threads to execute.
Re: A Thread that doesn't hold long enough
A daemon process in Linix is like a service in Windows. Also in Linix you can easily fork a process which is more complicated under Windows.
There are Windows API functions to suspend/resume a thread but if you need to use them chances are good that your design isn't the best. See
http://msdn.microsoft.com/en-us/libr...=vs.85%29.aspx
http://msdn.microsoft.com/en-us/libr...=vs.85%29.aspx
You might also want to consider the Windows synchronization functions. See
http://msdn.microsoft.com/en-us/libr...=vs.85%29.aspx
or the multi-threaing support in c++11. See
http://www.cplusplus.com/reference/multithreading/
Once a thread has been started it runs 'independently' of the thread that created it (subject to synchronization) until the process to which it belongs is destroyed. Destroying the main() thread is akin to destroying the process. So I don't understand what you mean by 'suspending the main thread'?
Re: A Thread that doesn't hold long enough
If you have the time to take a look at the source code, it would be greatly appreciated!
I stuck with it for a week already. It's pretty simple and light-weight except that I just can't figure out
the thread part
Thank you
Re: A Thread that doesn't hold long enough
Zip up the solution and post it here.
1 Attachment(s)
Re: A Thread that doesn't hold long enough
Hello,
I have stripped all the unnecessary files.
Hope you can compile it, easy task for you ;)
Thanks
Jack
Re: A Thread that doesn't hold long enough
Hello,
I work on it today. And finally find out that the Arrivals Suspension only works when there is a machine available
Thanks people
Jack