I'm trying to develop a consumer thread that handle asynchronous operations. This has been quite trick so far and I'm not completely comfortable with all aspects of multi-threading. Therefore I was hoping someone here could review the code.
The variables used (and interface) :
Here is ThreadQueueCode:public interface IJob { void Execute(); } enum StateTypes { Created, Running, Waiting, Stopping, Stopped } private volatile StateTypes state; private object stateLocker = new object(); private ThreadQueue queue = new ThreadQueue();
The ThreadMain method is executed in a separate background thread. The methods Add() , StopExecuting() and StartExecuting() will be executed from various other threads.Code:private class ThreadQueue { private Queue<IJob> queue = new Queue<IJob>(); private object locker = new object(); internal IJob Dequeue() { lock (locker) { if (queue.Count > 0) { return queue.Dequeue(); } return null; } } internal void Enqueue(IJob job) { lock (locker) queue.Enqueue(job); } internal void Clear() { lock (locker) queue.Clear(); } internal int Count() { lock (locker) return queue.Count; } }
Code:void ThreadMain() { while (bThreadStop) { IJob job; lock stateLocker) { if (state == Running) job = queue.Dequeue(); } if (job != null) job.Execute(); lock(stateLocker) { if (state = Running && job == null && queue.Count == 0) { state = waiting; while(state != waiting) Monitor.Wait(stateLocker); } if (state == Stopping) { state = Stopped; if (isStopping) Monitor.Pulse(stateLocker) while(state != Stopped) Monitor.Wait(stateLocker); } } } } /* This method shall block until worker thread is actually not executing any tasks */ private void StopExecuting() { lock (stateLocker) { /* Return if state already is stopped*/ if (state == Stopped || state == Waiting) return ; /* Set state stopping and wait for pulse and state equal Stopped */ state = Stopping; while(state != Stopped) Monitor.Wait(stateLocker); } } private void StartExecuting() { lock (stateLocker) { if (state == stopped) { state = Running; Monitor.Pulse(stateLocker); } } } public void Add(IJob job) { queue.Enqueue(job); lock (stateLocker) { if (state == Waiting) Monitor.Pulse(); } }




Reply With Quote