Win32/queue(STL)/threads?
CodeGuru Home VC++ / MFC / C++ .NET / C# Visual Basic VB Forums Developer.com
Page 1 of 2 12 LastLast
Results 1 to 15 of 19

Thread: Win32/queue(STL)/threads?

  1. #1
    Join Date
    Jul 2011
    Posts
    10

    Win32/queue(STL)/threads?

    A queue thread processes a STL queue, basically doing ...

    while ( TRUE )
    {
    if ( pqueue->empty() )
    {
    // auto-reset event
    WaitForSingleObject(hQueueResumeEvent, INFINITE);
    }
    p = pqueue->first();
    // process data
    EnterCriticalSection(&ccQueueWrite);
    pqueue->pop();
    LeaveCriticalSection(&ccQueueWrite);
    }

    In another thread, a CompletionRoutine for ReadDirectoryChangesW() does

    EnterCriticalSection(&ccQueueWrite)
    queue->push()
    LeaveCriticalSection(&ccQueueWrite)
    SetEvent(hResumeQueueThread)

    I find that after that push() and SetEvent(), the queue thread may crash upon accessing queue->first().

    In further testing, using "while" (my fix for now) instead of "if" ...

    while ( pqueue->empty() )
    {
    WaitForSingleObject(hQueueResumeEvent, INFINITE);
    }
    p = pqueue->first();

    ... I find that the WaitFor... sometimes happens twice; i.e., pqueue->empty() is TRUE even after the push/SetEvent by another thread.

    Can someone please explain what's happening and maybe suggest other fixes? Don't all threads share the same, coherent view of the queue after pqueue->push() returns?

    Thanks!

    - Vince

  2. #2
    Join Date
    Nov 2003
    Posts
    1,778

  3. #3
    Join Date
    Jul 2011
    Posts
    10

    Re: Win32/queue(STL)/threads?

    Thanks for the link. The queue-processing thread in that sample code contained
    Code:
        for (;;)
        {
            DWORD status = WaitForMultipleObjects(numWaitObjs, waitObjs, 
                                                  FALSE, INFINITE);
            if (status == WO_EXIT)
            {
                break;
            }//if
            else if (status ==  WO_QDATA)
            {
                string msg;
                if (q.dequeue(msg))
                    cout << msg;
            }//else if
            else
            {
                cerr << "Unexpected WFMO status, " << status << endl;
                break;
            }//else
        }//for
    All I would get from that is the "Unexpected WFMO status" message (q.dequeue() returned FALSE). Another thread has pushed to the queue and set the consumer event. Yet, in the queue thread, q.empty() is still TRUE (q.size() == 0). Why does that happen?

  4. #4
    Join Date
    Nov 2003
    Posts
    1,778

    Re: Win32/queue(STL)/threads?

    You are using it wrong. You'll have to post the code.

    gg

  5. #5
    Join Date
    Jul 2011
    Posts
    10

    Re: Win32/queue(STL)/threads?

    Quote Originally Posted by Codeplug View Post
    You are using it wrong. You'll have to post the code.

    gg
    I am not using your code. I want to know why mine fails. Here's the entire CompletionRoutine (for ReadDirectoryChangesW which is called in a loop in it's own thread).

    Code:
    VOID CALLBACK CompletionRoutine(DWORD dwError, DWORD dwBytes, LPOVERLAPPED hOverlap)
    {
    	if ( dwBytes == 0 ) return;
    
    	WATCH_DATA *wd = (WATCH_DATA*) hOverlap->hEvent; // private use of hEvent
    	FILE_NOTIFY_INFORMATION *p = (FILE_NOTIFY_INFORMATION *) wd->pBuffer;
    
    	do
    	{
    		EVENT_DATA *ed = new EVENT_DATA(wd, p);
    		EnterCriticalSection(&ccQueueWrite);
    		wd->dwQueued += 1;
    		pEventQueue->push(ed);
    		LeaveCriticalSection(&ccQueueWrite);
    		SetEvent(hQueueResumeEvent);
    	}
    	while ( p->NextEntryOffset && (p = (FILE_NOTIFY_INFORMATION*) ((LPBYTE)p + p->NextEntryOffset)) );
    }

    And here's the queue thread (less the actual processing of the queued EVENT_DATA structure).

    Code:
    DWORD QueueThread(LPVOID p)
    {
    	pEventQueue = new queue<EVENT_DATA*>;
    	EVENT_DATA *ed;
    
    	while ( TRUE )
    	{
    		if ( bEndQueueThread )
    		{
    			while ( !pEventQueue->empty() )
    			{
    				ed = pEventQueue->front();
    				pEventQueue->pop();
    				delete ed;
    			}
    			delete pEventQueue;
    			return 0;
    		}
    
    		if ( pEventQueue->empty() )
    		{
    			// auto-reset event signalled when something is put into the queue
    			WaitForSingleObject(hQueueResumeEvent, INFINITE);
    		}
    
    		if ( pEventQueue->empty() )
    		{
    			wprintf(L"Something wrong here!\r\n");
    			continue;
    		}
    
    		ed = pEventQueue->front();
    
    		// Processing of Event data omitted
    
    		EnterCriticalSection(&ccQueueWrite);
    		pEventQueue->pop();
    		ed->wd->dwQueued -= 1;
    		LeaveCriticalSection(&ccQueueWrite);
    		delete ed;
    	}
    	return 0;
    }
    When I stress test it (20000 changes in the directory watched as quick as possible) I see the message "Something wrong here!" about 50 times. I just want to know why pEventQueue->empty() is (sometimes) still TRUE after the WaitFor is satisfied (after a push() had been done). Thanks.

    - Vince

  6. #6
    Join Date
    Nov 2003
    Posts
    1,778

    Re: Win32/queue(STL)/threads?

    Any memory locations (variables) that are accessed by two or more threads must be protected by synchronization primitives. The only exception is if all accesses are read-only for the entire duration of all threads.

    Everywhere you see "pEventQueue" being accessed outside the protection of "ccQueueWrite" is undefined behavior.

    Here's an even better example, that also demonstrates proper overlapped I/O using ReadDirectoryChanges: http://cboard.cprogramming.com/windo...tml#post966940

    Take my code. Understand/Change/Use it. Post any questions you may have.

    gg

  7. #7
    Arjay's Avatar
    Arjay is offline Moderator / MS MVP Power Poster
    Join Date
    Aug 2004
    Posts
    10,965

    Re: Win32/queue(STL)/threads?

    Quote Originally Posted by vefatica View Post
    I am not using your code. I want to know why mine fails. Here's the entire CompletionRoutine (for ReadDirectoryChangesW which is called in a loop in it's own thread).

    Code:
    VOID CALLBACK CompletionRoutine(DWORD dwError, DWORD dwBytes, LPOVERLAPPED hOverlap)
    {
    	if ( dwBytes == 0 ) return;
    
    	WATCH_DATA *wd = (WATCH_DATA*) hOverlap->hEvent; // private use of hEvent
    	FILE_NOTIFY_INFORMATION *p = (FILE_NOTIFY_INFORMATION *) wd->pBuffer;
    
    	do
    	{
    		EVENT_DATA *ed = new EVENT_DATA(wd, p);
    		EnterCriticalSection(&ccQueueWrite);
    		wd->dwQueued += 1;
    		pEventQueue->push(ed);
    		LeaveCriticalSection(&ccQueueWrite);
    		SetEvent(hQueueResumeEvent);
    	}
    	while ( p->NextEntryOffset && (p = (FILE_NOTIFY_INFORMATION*) ((LPBYTE)p + p->NextEntryOffset)) );
    }

    And here's the queue thread (less the actual processing of the queued EVENT_DATA structure).

    Code:
    DWORD QueueThread(LPVOID p)
    {
    	pEventQueue = new queue<EVENT_DATA*>;
    	EVENT_DATA *ed;
    
    	while ( TRUE )
    	{
    		if ( bEndQueueThread )
    		{
    			while ( !pEventQueue->empty() )
    			{
    				ed = pEventQueue->front();
    				pEventQueue->pop();
    				delete ed;
    			}
    			delete pEventQueue;
    			return 0;
    		}
    
    		if ( pEventQueue->empty() )
    		{
    			// auto-reset event signalled when something is put into the queue
    			WaitForSingleObject(hQueueResumeEvent, INFINITE);
    		}
    
    		if ( pEventQueue->empty() )
    		{
    			wprintf(L"Something wrong here!\r\n");
    			continue;
    		}
    
    		ed = pEventQueue->front();
    
    		// Processing of Event data omitted
    
    		EnterCriticalSection(&ccQueueWrite);
    		pEventQueue->pop();
    		ed->wd->dwQueued -= 1;
    		LeaveCriticalSection(&ccQueueWrite);
    		delete ed;
    	}
    	return 0;
    }
    When I stress test it (20000 changes in the directory watched as quick as possible) I see the message "Something wrong here!" about 50 times. I just want to know why pEventQueue->empty() is (sometimes) still TRUE after the WaitFor is satisfied (after a push() had been done). Thanks.

    - Vince
    Codeplug has stated "Any memory locations (variables) that are accessed by two or more threads must be protected by synchronization primitives."

    A common mistake for folks new to multithreading programming is that they only synch the writes (adds, deletes, etc), but don't synch the reads (empty, front, etc). By only synch the writes, essentially you are not synch anything (except against another concurrent write - the reads are not protected and therefore corrupted data occurs).

    This is the mistake your code makes above - even the name of your critical section (ccQueueWrite) implies you believe you need to only sync write operations.

    At any rate, you ask why a pEventQueue->empty() may be true sometimes? Hard to say, but you need to sync this call (and the other read operations). If you do that, then things might work properly.

    Lastly, one thing that I find extremely helpful when working with multithreading is leveraging the principles of RAII (Resource Acquisition Is Initialization) when applied to locking.

    One of the common mistakes is to forget to release a lock (or not release when returning early or an exception is thrown). Using RAII eliminates these sorts of problems.

    Typically a 'locker' object is used on the synchronization primitive. The locker object acquires the lock in its constructor and releases the lock in the destructor. The length of the lock can be easily controlled by the scope of the locking object.

    For example, if I wanted to lock the complete code above using RAII, I could do the following (where AutoLock is a 'locker' class and m_csQueueLock is an instance of a wrapped critical section lock object):

    Code:
    while ( TRUE )
    {
      AutoLock.Lock( &m_csQueueLock );  // Uses RAII to lock the queue
    
      if ( bEndQueueThread )
      {
        while ( !pEventQueue->empty() )
        {
          ed = pEventQueue->front();
          pEventQueue->pop();
          delete ed;
        }
        delete pEventQueue;
        return 0;
      }
    
      if ( pEventQueue->empty() )
      {
        // auto-reset event signalled when something is put into the queue
        WaitForSingleObject(hQueueResumeEvent, INFINITE);
      }
    
      if ( pEventQueue->empty() )
      {
        wprintf(L"Something wrong here!\r\n");
        continue;
      }
    
      ed = pEventQueue->front();
    
      // Processing of Event data omitted
    
      pEventQueue->pop();
      ed->wd->dwQueued -= 1;
      delete ed;
    }
    The code above is a quick and dirty way to ensure all reads and writes are synchronized.

    In real code you wouldn't want to hold the lock for that long. In fact, if you are storing pointers to objects in the queue, you only need to hold the lock while performing actual queue functions (front, pop, empty, etc.), but not hold the lock while processing the data.

    In this case of using pointers, it would be quite simple to create a thread safe queue that wouldn't require any external locking. Callees would simply call the queue methods and the all the thread synchronization would be encapsulated.

    I can't stress the importance of encapsulating the thread synch inside a class as much as possible. If you share a resource between threads, don't encapsulate, and force the callers to synch every place they use the resource, there are bound to be threading issues (because someone is likely to screw it up). It just makes trouble shooting that much more difficult if you have locks all over the place (rather than centralized within a single class)
    Last edited by Arjay; July 3rd, 2011 at 03:05 PM.

  8. #8
    Join Date
    Jul 2011
    Posts
    10

    Re: Win32/queue(STL)/threads?

    Thanks, Arjay. I am aware of many of the issues you brought up. But I'm not much of a ++ person. I implemented the MTqueue class that Codeplug referred me to (not understanding it very well, especially the no_copy part). Building the synchronization into the objects themselves certainly saves me having to keep track (possibly poorly) of where it's needed.

    One more question. In MTqueue's enqueue() and dequeue() I wanted to call a method of a member structure of the object being [un]queued. I hacked the template by adding some type-specific stuff to enqueue() and dequeue() (ugh!). Is there a more proper way to do that?

    Thanks.

    - Vince

  9. #9
    Join Date
    Nov 2003
    Posts
    1,778

    Re: Win32/queue(STL)/threads?

    >> (not understanding it very well, especially the no_copy part)
    The no_copy class prevents that object from being copied - since the copy constructor and operator= is private. By inheriting from it, you are saying: "this object has no copy semantics and can't be copied".

    >> I hacked the template ... Is there a more proper way to do that?
    Yeah - don't do that!

    >> In MTqueue's enqueue() and dequeue() I wanted to call a method of a member structure of the object
    So the question is, why would you want to do that within the enqueue/dequeue methods? Why not just make the call before/after calling enqueue/dequeue?

    In the sample ReadDirectoryChanges code I linked to, take notice of how change notifications are accumulated before being posted to the consumer - which then calls dequeue_all to get all changes in one fell swoop. This handles things more efficiently if there are a lot of changes that occur at once.

    gg

  10. #10
    Join Date
    Jul 2011
    Posts
    10

    Re: Win32/queue(STL)/threads?

    Quote Originally Posted by Codeplug View Post
    >> In MTqueue's enqueue() and dequeue() I wanted to call a method of a member structure of the object
    So the question is, why would you want to do that within the enqueue/dequeue methods? Why not just make the call before/after calling enqueue/dequeue?

    In the sample ReadDirectoryChanges code I linked to, take notice of how change notifications are accumulated before being posted to the consumer - which then calls dequeue_all to get all changes in one fell swoop. This handles things more efficiently if there are a lot of changes that occur at once.

    gg
    The queue may serve many directory watches (watch = struct + thread). I wanted to update a particular watches "dwQueued" member when the synchronization was in effect. I can provide my own synchronization for that.

    What if ... instead of using the template as a template, used it as a model for a class declaration of my own, with any app-specific stuff built-in.

    Used as I use it (async with CompletionRoutine), ReadDirectoryChangesW doesn't accumulate many changes before calling the CompletionRoutine ... one most often ... up to ten or fifteen.

    The CompletionRoutine enqueues EVENT_DATA pointers (basically a FILE_NOTIFY_INFORMATION struct and a timestamp) and the queue dequeues and logs them.

    Depending on how I generate rapid changes in a directory I can capture/log them at 10,000-20,000 per second, losing none. Typical is ~12,500/s with one watch going on and one instance of another app calling CreateFile/DeleteFile as fast as it can.

    - Vince

  11. #11
    Join Date
    Nov 2003
    Posts
    1,778

    Re: Win32/queue(STL)/threads?

    Is WATCH_DATA::dwQueued any different from MTqueue::size()?

    Keep in mind that your CompletionRoutine is only called when the thread that called ReadDirectoryChanges() enters an alertable wait state. The advantage of using regular overlapped I/O like in my sample is that you spend less time dealing lock contention in the consumer.

    gg

  12. #12
    Join Date
    Jul 2011
    Posts
    10

    Re: Win32/queue(STL)/threads?

    Quote Originally Posted by Codeplug View Post
    Is WATCH_DATA::dwQueued any different from MTqueue::size()?

    Keep in mind that your CompletionRoutine is only called when the thread that called ReadDirectoryChanges() enters an alertable wait state. The advantage of using regular overlapped I/O like in my sample is that you spend less time dealing lock contention in the consumer.

    gg
    Yes, WATCH_DATA::dwQueued may not be the same as pEventQueue->size(); the queue and it's thread may serve any number of directory watches.

    RDCW sample? I missed it. I'm quite comfortable with CALLBACK functions (and pretty happy with its performance) and otherwise naive about overlapped I/O. The RDCW thread uses an alertable wait (WaitForSingleObjectEx) for an end-thread event; calls to the completion routine satisfy the wait.

    In fact, when a watch thread ends, it does

    Code:
    // queued events might refer to wd
    while ( wd->dwQueued )
    {
    	Sleep(1);
    }
    delete wd;
    So it's important that WATCH_DATA::dwQueued is accurate. I'd really like it tied closely to queue pushing and popping.

    Thanks gg.

    - Vince

  13. #13
    Join Date
    Nov 2003
    Posts
    1,778

    Re: Win32/queue(STL)/threads?

    >> RDCW sample? I missed it.
    http://cboard.cprogramming.com/windo...tml#post966940
    Notice the queue_MT::enqueue(It first, It last) and queue_MT::dequeue_all() methods. By adding to the queue in batches, and consuming the entire queue at once, lock contention can be significantly reduced. Not as easy to achieve within a callback.

    >> So it's important that WATCH_DATA::dwQueued is accurate.
    You shouldn't have to do that. To ensure that the consumer thread is done with that WATCH_DATA instance, just wait for the consumer thread to exit by waiting on it's thread handle. Then you can remove WATCH_DATA::dwQueued all together.

    gg

  14. #14
    Join Date
    Jul 2011
    Posts
    10

    Re: Win32/queue(STL)/threads?

    Quote Originally Posted by Codeplug View Post
    >> RDCW sample? I missed it.
    [url]http://cboard.cprogramm>> So it's important that WATCH_DATA::dwQueued is accurate.
    You shouldn't have to do that. To ensure that the consumer thread is done with that WATCH_DATA instance, just wait for the consumer thread to exit by waiting on it's thread handle. Then you can remove WATCH_DATA::dwQueued all together.

    gg
    The consumer thread doesn't exit; it may be serving other watches (or be re-used in the future). I guess I could stop it and restart it as necessary. An older (by years) design had one consumer thread per producer thread.

    My project is a plug-in (DLL) for a command processor (syntax attached). I must also worry about the user X-ing the console or simply unloading the plugin. The host app's plugin API offers wildcard and regex matching (for user file specs) and the ability to execute interpreted commands triggered by consumer events (not recommended during stress tests :-)

    A quick look at the producer thread in your link makes me think "Whew!". Mine has (in the last couple days) been boiled down to this.

    Code:
    VOID CALLBACK CompletionRoutine(DWORD dwError, DWORD dwBytes, LPOVERLAPPED hOverlap)
    {
    	if ( dwBytes == 0 )
    		return;
    
    	WATCH_DATA *wd = (WATCH_DATA*) hOverlap->hEvent; // private use of hEvent
    	FILE_NOTIFY_INFORMATION *p = (FILE_NOTIFY_INFORMATION *) wd->pBuffer;
    
    	do
    	{
    		EVENT_DATA *ed = new EVENT_DATA(wd, p);
    		pEventQueue->enqueue(ed);
    	}
    	while ( p->NextEntryOffset && (p = (FILE_NOTIFY_INFORMATION*) ((LPBYTE)p + p->NextEntryOffset)) );
    }
    
    DWORD WatchThread(LPVOID p)
    {
    	WATCH_DATA *wd = (WATCH_DATA *) p;
    	DWORD dwWritten, dwWaitResult;
    
    	while ( TRUE )
    	{
    		ReadDirectoryChangesW(wd->hWatchDir, wd->pBuffer, BUFSIZE, wd->bWatchSubdirs,
    			wd->dwOptions, &dwWritten, &wd->Overlapped, CompletionRoutine);
    
    		// wait is satisfied by calls to the completion routine
    		dwWaitResult = WaitForSingleObjectEx(wd->hEndWatchThreadEvent, INFINITE, TRUE);
    		if ( dwWaitResult == WAIT_OBJECT_0 ) break;
    	}
    
    	// queued events might refer to wd
    	while ( wd->dwQueued )
    	{
    		Sleep(1);
    	}
    	delete wd;
    	return 0;
    }
    Attached Files Attached Files

  15. #15
    Join Date
    Nov 2003
    Posts
    1,778

    Re: Win32/queue(STL)/threads?

    >> while ( wd->dwQueued )
    This is unsynchronized access to a shared variable. Can't do that.

    Here is something simple you could do:
    Code:
    class MTCounter : public no_copy
    {
        Event m_zeroEvent;
        CriticalSection m_cs;
        int m_counter;
    
        typedef CriticalSection::scoped_lock lock;
    
    public:
        MTCounter() : m_counter(0)
        {
            if (!m_zeroEvent.Create(true, true)) // manual reset, signaled
                throw std::runtime_error("MTCounter: failed to create event");
        }//constructor
    
        void increment()
        {
            lock l(m_cs);
            ++m_counter;
            m_zeroEvent.Reset();
        }//increment
    
        void decrement()
        {
            lock l(m_cs);
            if (m_counter == 0)
                throw std::runtime_error("MTCounter::decrement: count==0");
            --m_counter;
            if (m_counter == 0)
                m_zeroEvent.Signal();
        }//decrement
    
        bool wait_zero(DWORD to = INFINITE)
        {
            return ::WaitForSingleObject(m_zeroEvent, to) == WAIT_OBJECT_0;
        }//wait_zero
    };//MTCounter
    So add an MTCounter member to WATCH_DATA. Before the producer calls enqueue(), call MTCounter::increment(). When the consumer thread is done making any more references to the WATCH_DATA pointer, call MTCounter::decrement(). Producer thread calls wait_zero before deleting and exiting. This only works if there is a one-to-one correlation between producer and consumer, and each producer has it's own unique WATCH_DATA instance.

    >> EVENT_DATA *ed = new EVENT_DATA(wd, p);
    I'm assuming that all data within WATCH_DATA pointer is read-only by the consumer - and the consumer does not touch WATCH_DATA::pBuffer.

    I'm also assuming you are copying data out of p within the constructor, and not keeping any references to p or its memory (which belongs to the producer).

    >> delete wd;
    I also assume that WATCH_DATA's destructor closes handles, and de-allocates any dynamic memory it owns.

    >> Mine has (in the last couple days) been boiled down to this.
    Your consumer thread should look a lot different from the "QueueThread" code that was originally posted as well :)

    gg

Page 1 of 2 12 LastLast

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  


Azure Activities Information Page

Windows Mobile Development Center


Click Here to Expand Forum to Full Width

This is a CodeGuru survey question.


Featured


HTML5 Development Center