|
-
September 23rd, 2010, 04:24 AM
#1
Synchronization between multiple threads !
Problem:
One thread is writing a variable 'count' and 20 threads have to read this variable within a time frame of 30 milli seconds.
The writer thread should write the new value of 'count' in the next cycle, only after the 20 reader threads have read the current value of 'count' in the current cycle of 30 milliseconds.
The same for reader threads too. Reader threads have to read the new value of 'count' in the current cycle only after the writer thread has updated the previous value of 'count' in the current cycle.
Current situation:
The writer thread is updating the variable 'count' in current cycle. Before all 20 reader threads read the value of 'count' in this cycle, writer thread updates the value of 'count' and hence the remaining reader threads are reading the wrong value missing out the correct value.
(1) How to synchronize these threads in the following serial way -
1 writer thread writes the variable 'count', then 20 reader threads read it, then again 1 writer thread writes, then 20 reader threads read ?
The reader threads count is not fixed to 20. Sometimes there may be only 5 reader threads, sometimes there may be 15.
(2) Which synchronization object to use and how ? All threads are in the same application.
-
September 23rd, 2010, 08:31 AM
#2
Re: Synchronization between multiple threads !
You can use a condition variable to signal the reader threads that the value has been written and they can start their cycle. Then use a barrier to check if all reader threads have finished their cycle before performing the next cycle in the reader thread.
Cheers, D Drmmr
Please put [code][/code] tags around your code to preserve indentation and make it more readable.
As long as man ascribes to himself what is merely a posibility, he will not work for the attainment of it. - P. D. Ouspensky
-
September 28th, 2010, 10:26 AM
#3
Re: Synchronization between multiple threads !
Hi D_Drmmr ! thanks for your reply. But one thing I forgot to mention, I want solution for Windows OS only.
-
September 29th, 2010, 06:38 AM
#4
Re: Synchronization between multiple threads !
Here's a good start. It seems to cycle properly, but I don't claim to have everything cleaned up properly. A writer thread increments a counter at 30 ms intervals and trigger the reader threads. Reader threads read the counter once per trigger interval. See the output below.
Program cpp
Code:
// ThreadCycle.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
class ThreadCycle;
typedef struct _ThreadParams
{
_ThreadParams( ThreadCycle* pTC, UINT uIndex )
: pThreadCycle( pTC )
, hCycleCompletedEvent( ::CreateEvent( NULL, TRUE, FALSE, NULL ) )
, uThreadIndex( uIndex )
{
}
~_ThreadParams( )
{
::CloseHandle( hCycleCompletedEvent );
}
void SetCycleCompletedEvent( ) { ::SetEvent( hCycleCompletedEvent ); }
ThreadCycle* pThreadCycle;
HANDLE hCycleCompletedEvent;
UINT uThreadIndex;
} ThreadParams, *LPThreadParams;
typedef std::vector< HANDLE > ThreadList;
typedef std::vector< HANDLE > EventList;
typedef std::vector< LPThreadParams > ThreadParamsList;
class ThreadCycle
{
public:
ThreadCycle( UINT uThreadCount, UINT uCycleInterval )
: m_uThreadCount( uThreadCount )
, m_uCycleInterval( uCycleInterval )
, m_hStartEvent( ::CreateEvent( NULL, TRUE, FALSE, NULL ) )
, m_hStopEvent( ::CreateEvent( NULL, TRUE, FALSE, NULL ) )
, m_hTriggerEvent( ::CreateEvent( NULL, TRUE, FALSE, NULL ) )
, m_hWriterThread( NULL )
, m_lCounter( 0 )
{ }
~ThreadCycle( )
{
Stop( );
::CloseHandle( m_hWriterThread );
::CloseHandle( m_hStartEvent );
::CloseHandle( m_hStopEvent );
::CloseHandle( m_hTriggerEvent );
}
public:
void Start( )
{
tcout << tendl << tendl << _T("Start") << tendl << tendl;
ResetStartEvent( );
ResetStopEvent( );
UINT uIndex = 1;
m_hWriterThread = ( HANDLE ) _beginthreadex( NULL,
0,
WriterThread,
this,
0,
NULL );
for( UINT uCount = 0; uCount < m_uThreadCount; uCount++ )
{
LPThreadParams ptp = new ThreadParams( this, uIndex++ );
m_CycleCompletedEvents.push_back( ptp->hCycleCompletedEvent );
m_ThreadParamsList.push_back( ptp );
m_ReaderThreads.push_back(
( HANDLE ) _beginthreadex( NULL,
0,
ReaderThread,
ptp,
0,
NULL ) );
}
::SetEvent( m_hStartEvent );
}
void Stop( )
{
tcout << tendl << tendl << _T("Stopped") << tendl << tendl;
DWORD dwExitCode = 0;
::SetEvent( m_hStopEvent );
if( NULL != m_hWriterThread )
{
::WaitForSingleObject( m_hWriterThread, 5000 );
if( STILL_ACTIVE == ::GetExitCodeThread( m_hWriterThread, &dwExitCode ) )
{
dwExitCode = 0;
::TerminateThread( m_hWriterThread, dwExitCode );
}
}
if( 0 != m_ReaderThreads.size( ) )
{
for( ThreadList::iterator it = m_ReaderThreads.begin( );
it != m_ReaderThreads.end( );
it++ )
{
HANDLE hThread = (*it);
::WaitForSingleObject( hThread, 5000 );
dwExitCode = 0;
if( STILL_ACTIVE == ::GetExitCodeThread( hThread, &dwExitCode ) )
{
dwExitCode = 0;
::TerminateThread( hThread, dwExitCode );
}
::CloseHandle( hThread );
}
m_ReaderThreads.clear( );
}
for( ThreadParamsList::iterator it = m_ThreadParamsList.begin( );
it != m_ThreadParamsList.end( );
it++ )
{
delete (*it);
}
m_ThreadParamsList.clear( );
m_CycleCompletedEvents.clear( );
}
private:
static UINT WINAPI ReaderThread( LPVOID lpContext )
{
LPThreadParams pTP = static_cast< LPThreadParams > ( lpContext );
// Wait until told to start
::WaitForSingleObject( pTP->pThreadCycle->GetStartEvent( ), INFINITE );
HANDLE hEvents[] = { pTP->pThreadCycle->GetStopEvent( )
, pTP->pThreadCycle->GetTriggerEvent( ) };
while( TRUE )
{
switch( ::WaitForMultipleObjects(
sizeof( hEvents ) / sizeof( HANDLE )
, &hEvents[ 0 ]
, FALSE
, INFINITE ) )
{
case WAIT_OBJECT_0 + 0:
// Exit
pTP->SetCycleCompletedEvent( );
return 1;
case WAIT_OBJECT_0 + 1:
{
// Retrieve the counter
LONG lCounter = pTP->pThreadCycle->GetCounter( );
tcout << _T("Reader thread triggered: ") << lCounter << _T(" ") << pTP->uThreadIndex << tendl;
pTP->SetCycleCompletedEvent( );
::WaitForSingleObject( pTP->pThreadCycle->GetStartEvent( ), INFINITE );
}
break;
}
}
return 0;
}
static UINT WINAPI WriterThread( LPVOID lpContext )
{
ThreadCycle* pThreadCycle = static_cast< ThreadCycle* > ( lpContext );
// Wait until told to start
::WaitForSingleObject( pThreadCycle->GetStartEvent( ), INFINITE );
HANDLE hEvents[] = { pThreadCycle->GetStopEvent( ) };
while( TRUE )
{
switch( ::WaitForMultipleObjects(
sizeof( hEvents ) / sizeof( HANDLE )
, &hEvents[ 0 ]
, FALSE
, pThreadCycle->GetCycleInterval( ) ) )
{
case WAIT_OBJECT_0 + 0:
// Exit
return 1;
case WAIT_TIMEOUT:
tcout << _T("Writer thread cycled") << tendl;
// Reset the start event
pThreadCycle->ResetStartEvent( );
// Increment the counter
pThreadCycle->IncrementCounter( );
// Trigger the event
pThreadCycle->SetTriggerEvent( );
// Wait for the reader threads to finish
::WaitForMultipleObjects(
pThreadCycle->m_CycleCompletedEvents.size( )
, &pThreadCycle->m_CycleCompletedEvents[ 0 ]
, FALSE
, INFINITE );
pThreadCycle->ResetTriggerEvent( );
pThreadCycle->SetStartEvent( );
break;
}
}
return 0;
}
const UINT GetCycleInterval( ) { return m_uCycleInterval; }
HANDLE& GetStartEvent( ) { return m_hStartEvent; }
HANDLE& GetStopEvent( ) { return m_hStopEvent; }
HANDLE& GetTriggerEvent( ) { return m_hTriggerEvent; }
void SetStartEvent( ) { ::SetEvent( m_hStartEvent ); }
void ResetStartEvent( ) { ::ResetEvent( m_hStartEvent ); }
void SetStopEvent( ) { ::SetEvent( m_hStopEvent ); }
void ResetStopEvent( ) { ::ResetEvent( m_hStopEvent ); }
void SetTriggerEvent( ) { ::SetEvent( m_hTriggerEvent ); }
void ResetTriggerEvent( ) { ::ResetEvent( m_hTriggerEvent ); }
void IncrementCounter( )
{
::InterlockedExchange( &m_lCounter, m_lCounter + 1 );
}
const LONG GetCounter( )
{
return ::InterlockedExchange( &m_lCounter, m_lCounter );
}
private:
UINT m_uCycleInterval;
UINT m_uThreadCount;
HANDLE m_hTriggerEvent;
HANDLE m_hWriterThread;
EventList m_CycleCompletedEvents;
ThreadList m_ReaderThreads;
ThreadParamsList m_ThreadParamsList;
HANDLE m_hStartEvent;
HANDLE m_hStopEvent;
LONG m_lCounter;
};
int _tmain(int argc, _TCHAR* argv[])
{
ThreadCycle tc( 5, 30 ); // 5 threads; 30 ms
tc.Start( );
Sleep( 2000 );
tc.Stop( );
Sleep( 1000 );
tcout << tendl << tendl << tendl;
tc.Start( );
Sleep( 5000 );
tc.Stop( );
return 0;
}
stdafx.h
Code:
// stdafx.h : include file for standard system include files,
// or project specific include files that are used frequently, but
// are changed infrequently
//
#pragma once
#ifndef _WIN32_WINNT // Allow use of features specific to Windows NT 4 or later.
#define _WIN32_WINNT 0x0500 // Change this to the appropriate value to target Windows 98 and Windows 2000 or later.
#endif
#ifndef _WIN32_IE // Allow use of features specific to IE 4.0 or later.
#define _WIN32_IE 0x0500 // Change this to the appropriate value to target IE 5.0 or later.
#endif
#include "targetver.h"
#include "atlbase.h"
#include <stdio.h>
#include <tchar.h>
#include <iostream>
#include <vector>
#include <process.h>
using namespace std;
#ifdef _UNICODE
#define tcout wcout
#else
typedef tcout cout;
#endif
#define tendl endl
Output - 5 threads at 30 ms (the numbers next to the text are the counter and thread index)
Code:
Start
Writer thread cycled
Reader thread triggered: 1 3
Reader thread triggered: 1 4
Reader thread triggered: 1 5
Reader thread triggered: 1 1
Reader thread triggered: 1 2
Writer thread cycled
Reader thread triggered: 2 2
Reader thread triggered: 2 4
Reader thread triggered: 2 5
Reader thread triggered: 2 3
Reader thread triggered: 2 1
Writer thread cycled
Reader thread triggered: 3 1
Reader thread triggered: 3 4
Reader thread triggered: 3 5
Reader thread triggered: 3 3
Reader thread triggered: 3 2
Writer thread cycled
Reader thread triggered: 4 3
Reader thread triggered: 4 2
Reader thread triggered: 4 5
Reader thread triggered: 4 1
Reader thread triggered: 4 4
Writer thread cycled
Reader thread triggered: 5 1
Reader thread triggered: 5 2
Reader thread triggered: 5 3
Reader thread triggered: 5 4
Reader thread triggered: 5 5
Writer thread cycled
Reader thread triggered: 6 4
Reader thread triggered: 6 2
Reader thread triggered: 6 3
Reader thread triggered: 6 1
Reader thread triggered: 6 5
Writer thread cycled
Reader thread triggered: 7 2
Reader thread triggered: 7 3
Reader thread triggered: 7 5
Reader thread triggered: 7 1
Reader thread triggered: 7 4
Writer thread cycled
Reader thread triggered: 8 1
Reader thread triggered: 8 5
Reader thread triggered: 8 4
Reader thread triggered: 8 2
Reader thread triggered: 8 3
Writer thread cycled
Reader thread triggered: 9 3
Reader thread triggered: 9 5
Reader thread triggered: 9 2
Reader thread triggered: 9 4
Reader thread triggered: 9 1
Writer thread cycled
Reader thread triggered: 10 4
Reader thread triggered: 10 2
Reader thread triggered: 10 5
Reader thread triggered: 10 3
Reader thread triggered: 10 1
Writer thread cycled
Reader thread triggered: 11 2
Reader thread triggered: 11 4
Reader thread triggered: 11 3
Reader thread triggered: 11 5
Reader thread triggered: 11 1
Writer thread cycled
Reader thread triggered: 12 4
Reader thread triggered: 12 5
Reader thread triggered: 12 2
Reader thread triggered: 12 1
Reader thread triggered: 12 3
Last edited by Arjay; September 29th, 2010 at 12:49 PM.
-
September 30th, 2010, 10:25 AM
#5
Re: Synchronization between multiple threads !
 Originally Posted by Pipliyal
Hi D_Drmmr ! thanks for your reply. But one thing I forgot to mention, I want solution for Windows OS only.
The concepts of condition variable and barrier are not specific to a particular platform. In fact, they are part of the next version of the C++ standard.
The Boost.Thread library already implements (most of, if not all) of this.
Cheers, D Drmmr
Please put [code][/code] tags around your code to preserve indentation and make it more readable.
As long as man ascribes to himself what is merely a posibility, he will not work for the attainment of it. - P. D. Ouspensky
-
October 6th, 2010, 10:42 AM
#6
Re: Synchronization between multiple threads !
In this case, I think you could use Mutex to synch between writer thread and reader thread.
You just release mutex when your writer thread finishes or all of reader threads finish.
My English is very bad. So tell me if Something I talked make u confuse.
My Ebook Store: www.coding.vn/book.php
Posting Permissions
- You may not post new threads
- You may not post replies
- You may not post attachments
- You may not edit your posts
-
Forum Rules
|
Click Here to Expand Forum to Full Width
|