I'm trying to develop a consumer thread that handle asynchronous operations. This has been quite trick so far and I'm not completely comfortable with all aspects of multi-threading. Therefore I was hoping someone here could review the code.
The variables used (and interface) :
Here is ThreadQueueCode:public interface IJob
{
void Execute();
}
enum StateTypes { Created, Running, Waiting, Stopping, Stopped }
private volatile StateTypes state;
private object stateLocker = new object();
private ThreadQueue queue = new ThreadQueue();
The ThreadMain method is executed in a separate background thread. The methods Add() , StopExecuting() and StartExecuting() will be executed from various other threads.Code:private class ThreadQueue
{
private Queue<IJob> queue = new Queue<IJob>();
private object locker = new object();
internal IJob Dequeue()
{
lock (locker)
{
if (queue.Count > 0)
{
return queue.Dequeue();
}
return null;
}
}
internal void Enqueue(IJob job)
{
lock (locker) queue.Enqueue(job);
}
internal void Clear()
{
lock (locker) queue.Clear();
}
internal int Count()
{
lock (locker) return queue.Count;
}
}
Code:void ThreadMain()
{
while (bThreadStop) {
IJob job;
lock stateLocker) {
if (state == Running) job = queue.Dequeue();
}
if (job != null) job.Execute();
lock(stateLocker) {
if (state = Running && job == null && queue.Count == 0) {
state = waiting;
while(state != waiting) Monitor.Wait(stateLocker);
}
if (state == Stopping) {
state = Stopped;
if (isStopping) Monitor.Pulse(stateLocker)
while(state != Stopped) Monitor.Wait(stateLocker);
}
}
}
}
/* This method shall block until worker thread is actually not executing any tasks */
private void StopExecuting()
{
lock (stateLocker) {
/* Return if state already is stopped*/
if (state == Stopped || state == Waiting) return ;
/* Set state stopping and wait for pulse and state equal Stopped */
state = Stopping;
while(state != Stopped) Monitor.Wait(stateLocker);
}
}
private void StartExecuting()
{
lock (stateLocker) {
if (state == stopped)
{
state = Running;
Monitor.Pulse(stateLocker);
}
}
}
public void Add(IJob job)
{
queue.Enqueue(job);
lock (stateLocker) {
if (state == Waiting) Monitor.Pulse();
}
}

