CodeGuru Home VC++ / MFC / C++ .NET / C# Visual Basic VB Forums Developer.com
Results 1 to 14 of 14
  1. #1
    Join Date
    Jan 2012
    Location
    USA
    Posts
    91

    [RESOLVED] I need some help with mutex and condition variable.

    [UPDATED ON POST #10]

    Hello,

    I am writing an app that downloads the data over the sockets and processes them. The app below randomly generates data instead of downloading to simplify the task for now. I am on Ubuntu.

    So, the first thread generates a float number and adds it to a global vector. Thread two finds an average of all data in the vector. I expect from the program on iteration one: first thread add data to vector, then second thread calculate the average; iteration two: do the same; etc. However here is the log from the second thread:

    process_data - size: 33 average: 0.514959
    process_data - size: 77 average: 0.522389
    process_data - size: 77 average: 0.522389
    process_data - size: 86 average: 0.526629
    process_data - size: 86 average: 0.526629
    process_data - size: 100 average: 0.546825

    Could someone take a look at the code to tell me what am I doing wrong? -- Thank you!

    Build command:
    Code:
    g++ -g -O0 -fno-inline -std=c++0x -pthread test.cpp -o test

    Code:
    #include<iostream>
    #include<string>
    #include<deque>
    #include<cstdlib>   // rand()
    #include<fstream>
    using namespace std;
    
    int DEQUE_SIZE = 30;
    string datafile_name = "z_test_quotes.log";
    string processfile_name = "z_test_process.log";
    pthread_t quote_thread, process_thread;
    pthread_mutex_t quotes_mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t  quote_condition = PTHREAD_COND_INITIALIZER;
    deque<float> quotes_deque;
    bool newquote_flag = false;
    
    void* thfun_receive_quotes(void* arg);
    void* thfun_process_quotes(void* arg);
    void my_err(string str);
    void my_nsleep(int nanoseconds);
    int my_log(string file_name, string str);
    int Run();
    
    int main()
    {
        Run();
        return 0;
    }
    
    int Run()
    {
        my_err(" ******  starting process... ****** ");
        unlink(datafile_name.c_str());
        unlink(processfile_name.c_str());
    
        if ( pthread_create(&quote_thread, NULL, thfun_receive_quotes, NULL) != 0 ) // pthread_create() does not set usual errno but returns error code.
            my_err(" error creating thread quote_thread ");
    
        if ( pthread_create(&process_thread, NULL, thfun_process_quotes, NULL) != 0 )
            my_err(" error creating thread eur_thread");
    
        pthread_join( quote_thread, NULL);
        pthread_join( process_thread, NULL);
    
        my_err(" ******  terminating process ******* ");
    
        return 0;
    }
    
    // thread function
    void* thfun_receive_quotes(void* arg)
    {
        my_err(" ******  starting receive thread ****** ");
        float quote;
    
        for(unsigned long i=0; i<500; i++)
        {
            my_nsleep(1);   // Simulate delay.
    
            quote = rand() / (float)RAND_MAX;       // Simulate get quotes.
            pthread_mutex_lock( &quotes_mutex );
    
            quotes_deque.push_back(quote);
    
            newquote_flag = true;
            pthread_mutex_unlock( &quotes_mutex );
            pthread_cond_signal( &quote_condition );
        }
    
        pthread_cancel(process_thread);
    
        my_err(" ******  terminating receive thread ****** ");
        return ((void *) 0); // or pthread_exit((void *)0)
    }
    
    // thread function
    void* thfun_process_quotes(void* arg)
    {
        my_err(" ******  starting process thread ****** ");
        int size = 0;
        string str;
        deque<float> quotes;
    
        for(unsigned long j=0; ; j++)
        {
            pthread_mutex_lock( &quotes_mutex );
    
            // Check if the new data has really arrived,
            // to prevent 'spurious wakeup'.
            while(  !newquote_flag )
                pthread_cond_wait( &quote_condition, &quotes_mutex );
            quotes = quotes_deque;
            newquote_flag = false;
            pthread_mutex_unlock(&quotes_mutex);
    
            // Add calculations here.
            size = quotes.size();
            my_nsleep(1000);         // Simulate calculations. At 1 usec it starts skipping every other quote.
            quotes.clear();
            // End of calculations.
    
            // Logging.
            str = "process_quotes - size: " + to_string(size) + " * ";
            cout << str << endl << flush;
            //    kk_log(processfile_name, str);
        }
    
        my_err(" ****** terminating process thread ****** ");
        return ((void *) 0); // or pthread_exit((void *)0)
    }
    
    void my_err(string str)
    {
        std::cerr << str << endl << flush;
    }
    
    void my_nsleep(int nanoseconds)
    {
        struct timespec time = {0};
        time.tv_sec = 0;
        time.tv_nsec = nanoseconds;
        nanosleep(&time, (struct timespec*) NULL);
    }
    
    int my_log(string file_name, string str)
    {
        std::ofstream fileStream;
        fileStream.open(file_name.c_str(), ios::out | ios::app);
    
        if (!fileStream.is_open())
        {
            my_err("my_log() cannot open logging file " + file_name);
            return -1;
        }
    
        fileStream << str << endl << flush;
        fileStream.close();
        return 0;
    }
    Last edited by vincegata; February 26th, 2012 at 08:50 PM.

  2. #2
    Join Date
    Jul 2005
    Location
    Netherlands
    Posts
    2,042

    Re: I need some help with mutex and condition variable.

    Quote Originally Posted by vincegata View Post
    So, the first thread generates a float number and adds it to a global vector. Thread two finds an average of all data in the vector. I expect from the program on iteration one: first thread add data to vector, then second thread calculate the average; iteration two: do the same; etc. However here is the log from the second thread:
    And what exactly is the point of having two threads that are constantly waiting for one another? Why not just do everything in the same thread then?

    The entire loop body in each of your threads is synchronized, so the only thing these two threads can be doing simultaneously is increment the loop counter.
    Cheers, D Drmmr

    Please put [code][/code] tags around your code to preserve indentation and make it more readable.

    As long as man ascribes to himself what is merely a posibility, he will not work for the attainment of it. - P. D. Ouspensky

  3. #3
    Join Date
    Jan 2012
    Location
    USA
    Posts
    91

    Re: I need some help with mutex and condition variable.

    receive_data() runs in real time while process_data() can be slower, not running every time new price arrives.

    Someone else helped me with this so it's resolved. Thank you for your input.

  4. #4
    Join Date
    Nov 2003
    Posts
    1,902

    Re: [RESOLVED] I need some help with mutex and condition variable.

    >> // To simulate delay.
    The compiler will just remove that code completely. Call sleep/nanosleep for a real delay.

    You are not using condition variables properly. When waiting on a condition variable it should be done within a predicate loop. This is needed to handle spurious wakeups. It also handles cases where you don't need to wait at all, since the predicate condition is already true.

    In this case your predicate could be "new data has arrived since the last calculation". Assuming the vector always grows, you could keep track of the vector size at the last calculation:
    Code:
            pthread_mutex_lock( &data_mutex );
            while (data.size() == last_size)
                pthread_cond_wait( &data_condition, &data_mutex );
    gg
    Last edited by Codeplug; February 24th, 2012 at 10:49 AM. Reason: wrong logic

  5. #5
    Join Date
    Jan 2012
    Location
    USA
    Posts
    91

    Re: [RESOLVED] I need some help with mutex and condition variable.

    Thank you for the input, Codeplug! I've updated the code.

    I didn't see 'spurious wakeup' in Stevens & Rago.

  6. #6
    Join Date
    Jan 2012
    Location
    USA
    Posts
    91

    Re: [RESOLVED] I need some help with mutex and condition variable.

    It's just occurred to me that I cannot use vector.size() to check if the new data has arrived. The size of the vector will be limited (say 1000 elements). I'll probably use queue instead of vector to disregard the oldest data and add the new ones. I am thinking I can set a global flag once the new data arrive.

    Thanks again.

  7. #7
    Join Date
    Nov 2003
    Posts
    1,902

    Re: [RESOLVED] I need some help with mutex and condition variable.

    The consumer thread could just remove from the container everything that it processes. Then the predicate becomes "while (data.empty())".

    gg

  8. #8
    Join Date
    Jan 2012
    Location
    USA
    Posts
    91

    Re: [RESOLVED] I need some help with mutex and condition variable.

    I cannot empty container in process_data() because it's global, I add data into it one by one. There are always data in there. Global flag looks like is working.

  9. #9
    Join Date
    Nov 2003
    Posts
    1,902

    Re: [RESOLVED] I need some help with mutex and condition variable.

    Who else needs the data besides proecess_data()? If he is the only consumer of the data, then he can remove it and throw it away once it's no longer needed.

    >> Global flag looks like is working.
    Post your code for a review if you wan to know for sure.

    gg

  10. #10
    Join Date
    Jan 2012
    Location
    USA
    Posts
    91

    Re: [RESOLVED] I need some help with mutex and condition variable.

    I do not think my code is doing what I want it to do. I want receive() thread to work in real time as fast as possible and not to wait for process() thread. Process() thread can work slower than receive() so it does calculations not every time new quote received. So, I inserted sleep() 20 milliseconds (it can be longer but the result is the same) to process() thread function to simulate calculations however it slows down the whole process - all threads. I'd expect receive() thread to finish first and process() thread not to be able to run every time I add new quote to dequeue.

    The output right now:
    process_quotes - size: 1 *
    process_quotes - size: 2 *
    process_quotes - size: 3 *
    process_quotes - size: 4 *
    process_quotes - size: 5 *
    process_quotes - size: 6 *
    process_quotes - size: 7 *
    process_quotes - size: 8 *
    process_quotes - size: 9 *
    process_quotes - size: 10 *
    ................................
    process_quotes - size: 50 *


    Because I added sleep in process thread function, I expect the output to be something like:
    process_quotes - size: 1 *
    process_quotes - size: 4 *
    process_quotes - size: 7 *
    process_quotes - size: 10 *
    ................................
    process_quotes - size: 50 *


    Thank you.
    Last edited by vincegata; February 26th, 2012 at 08:51 PM.

  11. #11
    Join Date
    Nov 2003
    Posts
    1,902

    Re: [RESOLVED] I need some help with mutex and condition variable.

    >> So, I inserted sleep() 20 milliseconds ...
    Your holding the mutex during the sleep.

    You still don't have a predicate loop. Change the 'if' to 'while'.

    pthread_cancel() doesn't do what you think it does. http://pubs.opengroup.org/onlinepubs...l#tag_15_09_05

    If a thread already has a predicate loop, then you can include the exit condition as part of the predicate. Eg. "while it's not time to exit and there's no data to process, wait".

    gg

  12. #12
    Join Date
    Jan 2012
    Location
    USA
    Posts
    91

    Re: [RESOLVED] I need some help with mutex and condition variable.

    I updated the code on my first post. It looks like it's working now but I have two dequeues which I bet is not optimal.

    How do I do "while it's not time to exit" ?

  13. #13
    Join Date
    Nov 2003
    Posts
    1,902

    Re: [RESOLVED] I need some help with mutex and condition variable.

    You have the right idea. Instead of making an inefficient copy of the queue, use its swap() method instead. This will also keep you from re-processing data that has already been processed.

    Here is some sample code from another thread that demonstrates the swap() technique, and how to incorporate an exit condition. It uses boost threads, but they have the similar semantics as Posix threads.
    Code:
    #include <boost/thread/thread.hpp>
    #include <boost/thread/mutex.hpp>
    #include <boost/thread/condition.hpp>
    #include <boost/circular_buffer.hpp>
    
    #include <vector>
    #include <algorithm>
    #include <cstdlib>
    #include <iostream>
    using namespace std;
    
    typedef boost::circular_buffer<int> circular_buffer_t;
    
    const size_t PRODUCE_COUNT = 1000;
    const size_t CB_CAPACITY = 32;
    
    circular_buffer_t g_q(CB_CAPACITY);
    boost::mutex g_mux;
    boost::condition g_q_not_empty;
    bool g_bExit = false;
    
    void random_work();
    
    void produce()
    {
        for (int n = 0; n < PRODUCE_COUNT; ++n)
        {
            {
                boost::unique_lock<boost::mutex> l(g_mux);
                g_q.push_back(n);
                g_q_not_empty.notify_one();
            }//block
    
           random_work(); // slow things down a bit
        }//for
    
        {
            boost::unique_lock<boost::mutex> l(g_mux);
            g_bExit = true;
            g_q_not_empty.notify_all();
        }//block
    }//produce
    
    void consume()
    {
        vector<int> consumed;
        circular_buffer_t q(CB_CAPACITY);
        bool bExit = false;
        while (!bExit)
        {
            {
                boost::unique_lock<boost::mutex> l(g_mux);
                while (g_q.empty() && !g_bExit)
                    g_q_not_empty.wait(l);
    
                bExit = g_bExit;
                q.swap(g_q);
            }//block
    
            cout << "Consuming Q of size " << q.size() << endl;
            consumed.insert(consumed.end(), q.begin(), q.end());
            q.clear();
        }//while
    
        cout << "Produced " << PRODUCE_COUNT << " elements" << endl;
        cout << "Consumed " << consumed.size() << " elements" << endl;
    }//consume
    
    int main()
    {
        boost::thread consumer(consume);
        boost::thread producer(produce);
    
        consumer.join();
        producer.join();
    
        return 0;
    }//main
    
    void random_work()
    {
        vector<int> v(rand() % 1024);
        generate(v.begin(), v.end(), rand);
        sort(v.begin(), v.end());
    }//random_work
    Notes:
    - boost::unique_lock<> is an RAII object that acquires a mutex in its constructor, and releases it in its destructor.
    - boost::condition::notify_one() == pthread_cond_signal()
    - boost::condition::notify_all() == pthread_cond_broadcast()

    gg

  14. #14
    Join Date
    Jan 2012
    Location
    USA
    Posts
    91

    Re: [RESOLVED] I need some help with mutex and condition variable.

    Thank you for you help with this!

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  





Click Here to Expand Forum to Full Width

Featured