Re: Win32/queue(STL)/threads?
Quote:
So add an MTCounter member to WATCH_DATA. Before the producer calls enqueue(), call MTCounter::increment(). When the consumer thread is done making any more references to the WATCH_DATA pointer, call MTCounter::decrement(). Producer thread calls wait_zero before deleting and exiting. This only works if there is a one-to-one correlation between producer and consumer, and each producer has it's own unique WATCH_DATA instance.
I could do that. A "watch" **is** a WATCH_DATA instance and a thread.
>> EVENT_DATA *ed = new EVENT_DATA(wd, p);
Quote:
I'm assuming that all data within WATCH_DATA pointer is read-only by the consumer - and the consumer does not touch WATCH_DATA::pBuffer.
Yes, in practice, though it's not built-in. The only reason for an EVENT_DATA to have a pointer to its WATCH_DATA is so the (one) consumer thread can read and apply the user-specified options for that watch (action/filename filters, log format, command). Nobody messes with WATCH_DATA::pBuffer.
Quote:
I'm also assuming you are copying data out of p within the constructor, and not keeping any references to p or its memory (which belongs to the producer).
That's correct.
>> delete wd;
Quote:
I also assume that WATCH_DATA's destructor closes handles, and de-allocates any dynamic memory it owns.
That's correct.
>> Mine has (in the last couple days) been boiled down to this.
Quote:
Your consumer thread should look a lot different from the "QueueThread" code that was originally posted as well :)
Not really, since 90% of it is devoted figuring out, and doing, what the user specified (recorded in WATCH_DATA). The control parts are simpler:
Code:
HANDLE hWaitEvents[2] = { hEndQueueEvent, pEventQueue->consumer_event() };
while ( TRUE )
{
if ( pEventQueue->empty() )
{
dwWaitStatus = WaitForMultipleObjects(2, hWaitEvents, FALSE, INFINITE);
}
if ( dwWaitStatus == WAIT_OBJECT_0 ) // shutdown or unload plugin
{
while ( !pEventQueue->empty() )
{
pEventQueue->dequeue(ed);
delete ed;
}
delete pEventQueue;
return 0;
}
pEventQueue->dequeue(ed); // now process ed and delete it
The EVENT_DATA constructor adds a timestamp. I rather like RDCW providing data in little chunks (usually one change) because it makes the timestamps agree more closely with reality. I just tried RDCW synchronously with the same result as with a CompletionRoutine ... data comes in in small chunks and I can still generate/capture 1,000,000 changes in 80 sec. (still 12500/s), losing none. Would your proposed overlapped I/O routine result in bigger chucks of data?
Re: Win32/queue(STL)/threads?
Your consumer is hitting the lock more than it needs to by calling empty. Rely on the return value of dequeue instead:
Code:
HANDLE hWaitEvents[2] = { hEndQueueEvent, pEventQueue->consumer_event() };
while ( TRUE )
{
dwWaitStatus = WaitForMultipleObjects(2, hWaitEvents, FALSE, INFINITE);
if ( dwWaitStatus == WAIT_OBJECT_0 ) // shutdown or unload plugin
{
// TODO - ensure all producers are done producing at this point
while ( pEventQueue->dequeue(ed) )
delete ed;
delete pEventQueue;
return 0;
}
if ( !pEventQueue->dequeue(ed) )
continue; // another consumer beat us to it
// now process ed and delete it
Also notice that all calls to dequeue are checked. If you ever add additional consumers, you can't assume that there's something to consume.
Having said that, I recommend you use dequeue_all instead:
Code:
HANDLE hWaitEvents[2] = { hEndQueueEvent, pEventQueue->consumer_event() };
std::deque<EVENT_DATA*> work_q;
std::deque<EVENT_DATA*>::iterator it, it_end;
while ( TRUE )
{
work_q.clear();
dwWaitStatus = WaitForMultipleObjects(2, hWaitEvents, FALSE, INFINITE);
if ( dwWaitStatus == WAIT_OBJECT_0 ) // shutdown or unload plugin
{
// TODO - ensure all producers are done producing at this point
pEventQueue->dequeue_all(work_q);
it = work_q.begin();
it_end = work_q.end();
for ( ; it != it_end; ++it)
delete *it;
delete pEventQueue;
return 0;
}
pEventQueue->dequeue_all(work_q);
it = work_q.begin();
it_end = work_q.end();
for ( ; it != it_end; ++it)
{
// now process *it and delete it
}
It probably takes a lot longer to process the event than it does for the producer to enqueue it. If the consumer removes 20 events, that's one hit on the lock instead of 20. So you're less likely to have lock contention.
>> Would your proposed overlapped I/O routine result in bigger chucks of data?
It's probably the same as what you've already observed with blocking calls and completion routines.
gg
Re: Win32/queue(STL)/threads?
Quote:
Having said that, I recommend you use dequeue_all instead:
You lost me. I'm using MTqueue. It has no dequeue_all().
But if make the underlying deque public I can whiz through it deleting the EVENT_DATA structs at exit/unload time.
Re: Win32/queue(STL)/threads?