Click to See Complete Forum and Search --> : Threading


laasunde
January 12th, 2009, 07:31 AM
I'm trying to develop a consumer thread that handle asynchronous operations. This has been quite trick so far and I'm not completely comfortable with all aspects of multi-threading. Therefore I was hoping someone here could review the code.

The variables used (and interface) :

public interface IJob
{
void Execute();
}

enum StateTypes { Created, Running, Waiting, Stopping, Stopped }
private volatile StateTypes state;

private object stateLocker = new object();

private ThreadQueue queue = new ThreadQueue();


Here is ThreadQueue

private class ThreadQueue
{
private Queue<IJob> queue = new Queue<IJob>();
private object locker = new object();
internal IJob Dequeue()
{
lock (locker)
{
if (queue.Count > 0)
{
return queue.Dequeue();
}
return null;
}
}
internal void Enqueue(IJob job)
{
lock (locker) queue.Enqueue(job);
}
internal void Clear()
{
lock (locker) queue.Clear();
}
internal int Count()
{
lock (locker) return queue.Count;
}
}


The ThreadMain method is executed in a separate background thread. The methods Add() , StopExecuting() and StartExecuting() will be executed from various other threads.

void ThreadMain()
{
while (bThreadStop) {
IJob job;
lock stateLocker) {
if (state == Running) job = queue.Dequeue();
}
if (job != null) job.Execute();

lock(stateLocker) {
if (state = Running && job == null && queue.Count == 0) {
state = waiting;
while(state != waiting) Monitor.Wait(stateLocker);
}
if (state == Stopping) {
state = Stopped;
if (isStopping) Monitor.Pulse(stateLocker)
while(state != Stopped) Monitor.Wait(stateLocker);
}
}
}
}

/* This method shall block until worker thread is actually not executing any tasks */
private void StopExecuting()
{
lock (stateLocker) {
/* Return if state already is stopped*/
if (state == Stopped || state == Waiting) return ;

/* Set state stopping and wait for pulse and state equal Stopped */
state = Stopping;
while(state != Stopped) Monitor.Wait(stateLocker);
}
}

private void StartExecuting()
{
lock (stateLocker) {
if (state == stopped)
{
state = Running;
Monitor.Pulse(stateLocker);
}
}
}

public void Add(IJob job)
{
queue.Enqueue(job);
lock (stateLocker) {
if (state == Waiting) Monitor.Pulse();
}
}

Mutant_Fruit
January 12th, 2009, 08:28 AM
private void StopExecuting()
{
lock (stateLocker) {
/* Return if state already is stopped*/
if (state == Stopped || state == Waiting) return ;

/* Set state stopping and wait for pulse and state equal Stopped */
state = Stopping;
while(state != Stopped) Monitor.Wait(stateLocker);
}
}

You've locked on 'stateLocker' and then wait for it to get pulsed while that lock is held. This will lock up your app because 'Add' cannot get a lock on stateLocker and so you can't ever add anything to the queue and pulse the locker.

Also, writing multiple statements on one line isn't great. It makes it harder to see what's going on.

For something like this I'd recommend using an AutoResetEvent or ManualResetEvent rather than using Pulse. The docs on Pulse explain the issues better (http://msdn.microsoft.com/en-us/library/system.threading.monitor.pulse.aspx).

laasunde
January 12th, 2009, 09:07 AM
Here is the original solution for this problem using EventWaitHandlers.


object stateLocker = new object();
ThreadQueue queue = new ThreadQueue();

EventWaitHandle WaitSignal = new AutoResetEvent(false);
EventWaitHandle WaitOn = new AutoResetEvent(false);
enum State { Created, Started, Stopped };

void ThreadMain()
{
while (bThreadStop)
{
IJob job;
lock (stateLocker)
{
if (state == Running)
{
job = queue.Dequeue();
}
}
if (job != null)
{
job.Execute();
}
else
{
WaitHandle.SignalAndWait(WaitSignal , WaitOn);
}
}
}


The problem with this solution is that I could never figure out how to stop the execution of the background thread properly (i.e. implement StopExecution). I need to be certain that no execution is performed after StopExecution has been executed. After StopExecution most of the applications resources will be closed - so any remaining IJob still running in the thread after StopExecution will cause problems.

/* Can lead to inconsistent data */
private void StopExecuting1()
{
lock (stateLocker)
{
State.Stopped;
}
WaitSignal.WaitOne();
}

/* Will lead to deadlock */
private void StopExecuting2()
{
lock (stateLocker)
{
state = State.Stopped;
WaitSignal.WaitOne();
}
}


And then..

private void StartExecuting2()
{
lock (stateLocker) {
if (state == stopped)
{
state = Running;
WaitOn.set();
}
}
}

public void Add(IJob job)
{
queue.Enqueue(job);
lock (stateLocker)
{
if (state == Stopped)
{
WaitOn.set();
}
}
}

Mutant_Fruit
January 12th, 2009, 07:30 PM
public class TaskManager
{
bool stop;
ManualResetEvent handle = new ManualResetEvent(false);
ManualResetEvent stoppingHandle = new ManualResetEvent(false);
object locker = new object();
Queue<object> tasks = new Queue<object>();

public void EnqueueTask(object o)
{
lock (locker)
{
tasks.Enqueue(o);
handle.Set();
}
}


void Loop()
{
while (true)
{
object task = null;

lock (locker)
{
if (tasks.Count > 0)
task = tasks.Dequeue();

if (tasks.Count == 0)
handle.Reset();
}

if (task != null)
ExecuteTask(task);

handle.WaitOne();

// Check to see if we should exit as soon as we wake up from the waithandle.
if (stop)
{
stoppingHandle.Set();
return;
}
}
}

public void Stop()
{
// If you call stop while a task is in the middle of executing, you will block until the task completes
// then the 'stoppingHandle' will be triggered.
stop = true;
handle.Set();
stoppingHandle.WaitOne();
}
}

This is a very quick example I threw together to demonstrate you how you implement a threadsafe task queue and also have threadsafe stopping whereby you don't stop while a task is in the middle of executing.

If you want all pending tasks to execute before you exit, just change the "if (stop)" line to be "if (stop && tasks.Count == 0)". Though those details are best left up to yourself ;)



/* Will lead to deadlock */
private void StopExecuting2()
{
lock (stateLocker)
{
state = State.Stopped;
WaitSignal.WaitOne();
}
}


It's probably deadlocking because you're waiting on a handle while still holding a lock. If you look at your code, you need the StateLocker to access either WaitSignal or WaitOn. So if you call 'StopExecuting2', you can never signal WaitOn because you can never get the lock.

Bear in mind that the WaitHandle classes are 'threadsafe' in that it is perfectly safe to be calling methods on them from any thread at any time. You can see that I call 'WaitOne' without any locks.

You'll also notice that I take a lock when I'm dequeuing tasks and have a little bit of logic in there to set the WaitHandle to its unsignalled state when there are no further tasks to execute. The reason why this logic is inside the lock is that otherwise there'd be a race condition as follows:
Thread1: Check to see if queue is empty - it is
*Thread interrupt*
Thread2: Enqueue a task
Thread2: Set handle to signalled
*Thread finish, switch to Thread1*
Thread1: Set handle to unsignalled.

There are smarter ways around this issue using an AutoResetEvent, but I think this is easier to understand.

laasunde
January 13th, 2009, 01:21 AM
Thank you.

Currently the Stop() method exits the background thread which means a re-start would have to re-create a new background thread. So I would like to keep the background thread alive during a stop and then re-start it. Would the below change fix that?


EventWaitHandle reStartHandle = new AutoResetEvent (false);

void Loop() {
while (true)
{
object task = null;

lock (locker) {
if (tasks.Count > 0)
task = tasks.Dequeue();

if (tasks.Count == 0)
handle.Reset();
}

if (task != null)
ExecuteTask(task);

handle.WaitOne();

if (stop) {
stoppingHandle.Set();
reStartHandle.WaitOne();
}
}

public void Start()
{
if (stop)
{
reStartHandle.Set();
stop = false;
}
}


Another thing, to force the Stop() to occur as quickly as possible would this be a good approach?

lock (locker) {
if (!stop)
{
if (tasks.Count > 0)
task = tasks.Dequeue();

if (tasks.Count == 0)
handle.Reset();
}
}


In #3 I used a state variable and a statelocker to protect \ synchronize access to the variable in a multi-threaded environment. I see in your example you rely on using a boolean and no protection. Just curious, was I meaning to cautious in my approach or is there a difference between enum and bool in terms of threading behaviour?

[code]
There are smarter ways around this issue using an AutoResetEvent, but I think this is easier to understand.

This bit sounds interesting. Care to elaborate in how AutoResetEvent would improve the solution?

Mutant_Fruit
January 13th, 2009, 02:53 AM
public void Start()
{
if (stop)
{
reStartHandle.Set();
stop = false;
}
}

There's a race condiiton ;) You should set stop to false *before* allowing the other thread to run, otherwise it could execute and hit the "if (stop)" line and the loop will stop before you set the variable to false. You also forgot to reset the 'stoppingHandle'.

public void Start()
{
if (stop)
{
stop = false;
stoppingHandle.Reset ();
reStartHandle.Set();

}
}


You also need to reset the stoppingHandle here.


lock (locker) {
if (!stop)
{
if (tasks.Count > 0)
task = tasks.Dequeue();

if (tasks.Count == 0)
handle.Reset();
}
}

There's no need for this. You already check to see if you should exit as soon as you wake up from the handle.


In #3 I used a state variable and a statelocker to protect \ synchronize access to the variable in a multi-threaded environment. I see in your example you rely on using a boolean and no protection. Just curious, was I meaning to cautious in my approach or is there a difference between enum and bool in terms of threading behaviour?

This should really be 'volatile'. That's what you get for quick examples ;) The reason why it has no locking or explicit thread safety is that it doesn't matter. The write is guaranteed to be atomic, so it's impossible to get a 'corrupt' value if you read from the bool at the same time as writing to it. The same is not true for all .NET types (i.e. any structs which aren't primitives and int64 values).


This bit sounds interesting. Care to elaborate in how AutoResetEvent would improve the solution?

An autoreset event automatically resets to unsignalled once a single thread which is waiting on it wakes up. In this case there is only 1 thread, so once that thread wakes up, it resets to unsignalled. Then you can simplify the logic a little because you don't ned to handle resetting the handle, you just need to ensure you don't wait on it while there are tasks to process - i.e. tasks.Count > 0. It's not really worth it though. The existing example is easy to understand and it'd only save one or two LOC ;)

laasunde
January 13th, 2009, 07:18 AM
Thanks for your reply.


There's a race condiiton ;) You should set stop to false *before* allowing the other thread to run, otherwise it could execute and hit the "if (stop)" line and the loop will stop before you set the variable to false. You also forgot to reset the 'stoppingHandle'.

public void Start()
{
if (stop)
{
stop = false;
stoppingHandle.Reset ();
reStartHandle.Set();

}}


Ok.


You also need to reset the stoppingHandle here.


lock (locker) {
if (!stop)
{
if (tasks.Count > 0)
task = tasks.Dequeue();

if (tasks.Count == 0)
handle.Reset();
}
}

There's no need for this. You already check to see if you should exit as soon as you wake up from the handle.


Why do I need to reset it here? Do you mean next to handle.Reset()? Thought resetting it in the Start() method would be enough.



This should really be 'volatile'. That's what you get for quick examples ;) The reason why it has no locking or explicit thread safety is that it doesn't matter. The write is guaranteed to be atomic, so it's impossible to get a 'corrupt' value if you read from the bool at the same time as writing to it. The same is not true for all .NET types (i.e. any structs which aren't primitives and int64 values).

Ok.

Mutant_Fruit
January 13th, 2009, 07:30 AM
Why do I need to reset it here? Do you mean next to handle.Reset()? Thought resetting it in the Start() method would be enough.

When there are no tasks to execute, you want the next call to handle.WaitOne () to block until a task is added to the queue, therefore you have to reset the handle so it is no longer in the 'signalled' state. If you don't reset it when the queue is empty, the loop is essentially a "while (true) { }" loop and will eat CPU.

TheCPUWizard
January 13th, 2009, 08:03 AM
[QUOTE=Mutant_Fruit;1801053This should really be 'volatile'. That's what you get for quick examples ;) The reason why it has no locking or explicit thread safety is that it doesn't matter. The write is guaranteed to be atomic, so it's impossible to get a 'corrupt' value if you read from the bool at the same time as writing to it. The same is not true for all .NET types (i.e. any structs which aren't primitives and int64 values).[/QUOTE]

Also "double" is not atomic (although float is), so anytime you use long or double (instead of int or float) you must take into account overlapped access which would allow you to get a value with some of the bits from the old value and some of the bits from the new value, giving a totally unpredictable (in most cases) result....