Author Topic: Please help to review the code in cbThreadPool  (Read 6216 times)

Offline ollydbg

  • Developer
  • Lives here!
  • *****
  • Posts: 6077
  • OpenCV and Robotics
    • Chinese OpenCV forum moderator
Please help to review the code in cbThreadPool
« on: February 05, 2014, 07:02:57 am »
By reviewing the code, I found two issue:
1. m_taskAdded don't go to false once it was set to true.
Code
// this thread is finishing the task, and is going to be idle.
// if there is no task left, and the total threads is running is 0, then we have all task done
// otherwise, just put me to the idle mode by m_semaphore->Post()
bool cbThreadPool::WaitingThread()
{
  wxMutexLocker lock(m_Mutex);
  --m_workingThreads;

  if (m_workingThreads <= 0 && m_tasksQueue.empty())
  {
    // Sends cbEVT_THREADTASK_ALLDONE message only the real task is all done
    // FIXME (ollydbg#1#): Look at the variable m_taskAdded, it was initialized as false in pool's
    // constructor, and the only chance it was set true is in AddTask() function, I don't find any
    // code to reset this value to false, is that true logic?
    if (m_taskAdded)
    {
      // notify the owner that all tasks are done
      CodeBlocksEvent evt = CodeBlocksEvent(cbEVT_THREADTASK_ALLDONE, m_ID);
      wxPostEvent(m_pOwner, evt);
    }
See the FIXME section of mine.


2, cbThreadPool::AwakeNeeded run a lot of Post() call, but I don't think it is necessary, right?
Code
inline void cbThreadPool::AwakeNeeded()
{
  if (m_concurrentThreads == -1)
    return;
  // TODO (ollydbg#1#): why?
  // I think this can be optimized, for example, the m_concurrentThreads = 3, and the m_tasksQueue.size() == 5
  // m_workingThreads = 2, do we need to call Post() function 5 times? I think 1 is enough.
  // if the m_workingThreads = 0, then I think 3 is enough.
  for (std::size_t i = 0; i < m_tasksQueue.size(); ++i)
    m_semaphore->Post();
}
See the TODO section above. The code was changed in
Code
Revision: cc3a66de85dd9d73b664e298f35523b69f17ce81
Author: ceniza <ceniza@2a5c6006-c6dd-42ca-98ab-0921f2732cef>
Date: 2006-5-21 10:41:21
Message:
* Replaced the condition with a semaphore (just like the previous implementation).

git-svn-id: http://svn.code.sf.net/p/codeblocks/code/trunk@2480 2a5c6006-c6dd-42ca-98ab-0921f2732cef
----
Modified: src/sdk/cbthreadpool.cpp
Modified: src/sdk/cbthreadpool.h



I add many comments to those classes to help understand the logic, see the patch file below.
I would like to see some other devs or users comments on those issues, thanks.

Code
bbfb0cce6e43a2d649c784b8b82158f38d19ded2
 src/include/cbthreadpool.h | 48 +++++++++++++++++++++++++++-------------------
 src/sdk/cbthreadpool.cpp   | 19 ++++++++++++------
 2 files changed, 41 insertions(+), 26 deletions(-)

diff --git a/src/include/cbthreadpool.h b/src/include/cbthreadpool.h
index a06fb52..4a5ae17 100644
--- a/src/include/cbthreadpool.h
+++ b/src/include/cbthreadpool.h
@@ -70,7 +70,7 @@ class DLLIMPORT cbThreadPool
     /** Begin a batch process
       *
       * @note EVIL: Call it if you want to add all tasks first and get none executed yet.
-      * If you DON'T call it, taks will be executed as you add them (in fact it's what
+      * If you DON'T call it, tasks will be executed as you add them (in fact it's what
       * one would expect).
       */
     void BatchBegin();
@@ -101,7 +101,7 @@ class DLLIMPORT cbThreadPool
         T *operator -> () const throw();
 
       private:
-        void dispose();
+        void dispose(); //decrease the counter, and if it get 0, destroy both counter and ptr
     };
 
     /** A Worker Thread class.
@@ -115,7 +115,8 @@ class DLLIMPORT cbThreadPool
         /** cbWorkerThread ctor
           *
           * @param pool Thread Pool this Worker Thread belongs to
-          * @param semaphore Used to synchronise the Worker Threads
+          * @param semaphore Used to synchronize the Worker Threads, it is a reference to the CountedPtr
+          * object
           */
         cbWorkerThread(cbThreadPool *pool, CountedPtr<wxSemaphore> &semaphore);
 
@@ -137,9 +138,10 @@ class DLLIMPORT cbThreadPool
       private:
         bool m_abort;
         cbThreadPool *m_pPool;
+        // a counted semaphore shared with all the cbWorkerThread
         CountedPtr<wxSemaphore> m_semaphore;
         cbThreadedTask *m_pTask;
-        wxMutex m_taskMutex;
+        wxMutex m_taskMutex;// to protect the member variable accessing from multiply threads
     };
 
     typedef std::vector<cbWorkerThread *> WorkerThreadsArray;
@@ -170,26 +172,27 @@ class DLLIMPORT cbThreadPool
 
     typedef std::list<cbThreadedTaskElement> TasksQueue;
 
-    wxEvtHandler *m_pOwner;
-    int m_ID;
-    bool m_batching;
+    wxEvtHandler *m_pOwner; // events notification will send to this guy
+    int m_ID;           // id used to fill the ID field of the event
+    bool m_batching;    // whether in batch mode of adding tasks
 
-    int m_concurrentThreads; // current number of concurrent threads
+    int m_concurrentThreads; // current number of concurrent threads, this is the maximum value of the m_workingThreads
     unsigned int m_stackSize; // stack size for every threads
     int m_concurrentThreadsSchedule; // if we cannot apply the new value of concurrent threads, keep it here
-    WorkerThreadsArray m_threads; // the working threads are stored here
-    TasksQueue m_tasksQueue; // and the pending tasks here
+    WorkerThreadsArray m_threads; // the working threads(cbWorkerThread) are stored here
+    TasksQueue m_tasksQueue; // and the pending tasks (cbThreadedTaskElement) here
     bool m_taskAdded; // true if any task added
 
-    int m_workingThreads; // how many working threads are running a task
+    int m_workingThreads; // how many working threads are running tasks
 
-    mutable wxMutex m_Mutex; // we better be safe
+    mutable wxMutex m_Mutex; // we better be safe, protect the change of member variables
 
-    CountedPtr<wxSemaphore> m_semaphore; // used to synchronise the Worker Threads
+    CountedPtr<wxSemaphore> m_semaphore; // used to synchronize the Worker Threads, the counted value is that
+    // how many threads are sharing this semaphore
 
     void _SetConcurrentThreads(int concurrentThreads); // like SetConcurrentThreads, but non-thread safe
     void Broadcast(); // awakes all threads
-    void AwakeNeeded(); // awakes only a few threads
+    void AwakeNeeded(); // awakes only a few threads // TODO (ollydbg#1#): ?
 
   protected:
     friend class cbWorkerThread;
@@ -200,16 +203,18 @@ class DLLIMPORT cbThreadPool
       */
     cbThreadedTaskElement GetNextTask();
 
-    /// Mechanism for the threads to tell the Pool they're running
+    /// Mechanism for the threads to tell the Pool they're running, a thread is switch from the idle
+    /// mode to working mode. This is triggered by semaphore released somewhere
     void WorkingThread();
 
-    /** Mechanism for the threads to tell the Pool they're done and will wait
+    /** Mechanism for the threads to tell the Pool they're done and will go to idle, so we can assign
+      * another task to this thread.
       *
       * @return true if everything is OK, false if we should abort
       */
     bool WaitingThread();
 
-    /** Called by a Worker Thread to inform a task has finished
+    /** Called by a Worker Thread to inform a single task has finished, this will send a cbEVT_THREADTASK_ENDED event
       *
       * @param thread The Worker Thread
       */
@@ -254,9 +259,9 @@ inline void cbThreadPool::BatchBegin()
 
 inline void cbThreadPool::Broadcast()
 {
-  if (m_concurrentThreads == -1)
+  if (m_concurrentThreads == -1) //// TODO (ollydbg#1#): why need such check?
     return;
-
+  // let the idle(pending) worker thread to execute tasks, those worker threads are waiting for semaphore
   for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads); ++i)
     m_semaphore->Post();
 }
@@ -265,7 +270,10 @@ inline void cbThreadPool::AwakeNeeded()
 {
   if (m_concurrentThreads == -1)
     return;
-
+  // TODO (ollydbg#1#): why?
+  // I think this can be optimized, for example, the m_concurrentThreads = 3, and the m_tasksQueue.size() == 5
+  // m_workingThreads = 2, do we need to call Post() function 5 times? I think 1 is enough.
+  // if the m_workingThreads = 0, then I think 3 is enough.
   for (std::size_t i = 0; i < m_tasksQueue.size(); ++i)
     m_semaphore->Post();
 }
diff --git a/src/sdk/cbthreadpool.cpp b/src/sdk/cbthreadpool.cpp
index 2a4fa1e..b49a991 100644
--- a/src/sdk/cbthreadpool.cpp
+++ b/src/sdk/cbthreadpool.cpp
@@ -47,16 +47,16 @@ void cbThreadPool::SetConcurrentThreads(int concurrentThreads)
   wxMutexLocker lock(m_Mutex);
   _SetConcurrentThreads(concurrentThreads);
 }
-
+// this function is already wrappered by a mutex
 void cbThreadPool::_SetConcurrentThreads(int concurrentThreads)
 {
-  if (!m_workingThreads)
+  if (!m_workingThreads)// if pool is not running (no thread is running)
   {
     std::for_each(m_threads.begin(), m_threads.end(), std::mem_fun(&cbWorkerThread::Abort));
     Broadcast();
     m_threads.clear();
 
-    // set a new Semaphore for the new threads
+    // set a new Semaphore for the new threads, note the max value is the concurrentThreads
     m_semaphore = CountedPtr<wxSemaphore>(new wxSemaphore(0, concurrentThreads));
 
     m_concurrentThreads = concurrentThreads;
@@ -66,7 +66,7 @@ void cbThreadPool::_SetConcurrentThreads(int concurrentThreads)
     {
       m_threads.push_back(new cbWorkerThread(this, m_semaphore));
       m_threads.back()->Create(m_stackSize);
-      m_threads.back()->Run();
+      m_threads.back()->Run(); // this will run cbWorkerThread::Entry()
     }
 
 //    Manager::Get()->GetLogManager()->DebugLog(_T("Concurrent threads for pool set to %d"), m_concurrentThreads);
@@ -85,6 +85,8 @@ void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
   m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));
   m_taskAdded = true;
 
+  // we are in batch mode, so no need to awake the idle thread
+  // m_workingThreads < m_concurrentThreads means there are some threads in idle mode (no task assigned)
   if (!m_batching && m_workingThreads < m_concurrentThreads)
     AwakeNeeded();
 }
@@ -118,13 +120,15 @@ cbThreadPool::cbThreadedTaskElement cbThreadPool::GetNextTask()
 
   return element;
 }
-
+// a thread is leaving from idle mode, and run a task
 void cbThreadPool::WorkingThread()
 {
   wxMutexLocker lock(m_Mutex);
   ++m_workingThreads;
 }
-
+// this thread is finishing the task, and is going to be idle.
+// if there is no task left, and the total threads is running is 0, then we have all task done
+// otherwise, just put me to the idle mode by m_semaphore->Post()
 bool cbThreadPool::WaitingThread()
 {
   wxMutexLocker lock(m_Mutex);
@@ -133,6 +137,9 @@ bool cbThreadPool::WaitingThread()
   if (m_workingThreads <= 0 && m_tasksQueue.empty())
   {
     // Sends cbEVT_THREADTASK_ALLDONE message only the real task is all done
+    // FIXME (ollydbg#1#): Look at the variable m_taskAdded, it was initialized as false in pool's
+    // constructor, and the only chance it was set true is in AddTask() function, I don't find any
+    // code to reset this value to false, is that true logic?
     if (m_taskAdded)
     {
       // notify the owner that all tasks are done


« Last Edit: February 05, 2014, 07:05:02 am by ollydbg »
If some piece of memory should be reused, turn them to variables (or const variables).
If some piece of operations should be reused, turn them to functions.
If they happened together, then turn them to classes.

Offline ollydbg

  • Developer
  • Lives here!
  • *****
  • Posts: 6077
  • OpenCV and Robotics
    • Chinese OpenCV forum moderator
Re: Please help to review the code in cbThreadPool
« Reply #1 on: February 05, 2014, 07:12:53 am »
Look at the change of rev 2480, there is a change that:
Code
void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
{
  if (!task)
  {
    return;
  }

  wxMutexLocker lock(m_Mutex);

  m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));

  if (!m_batching && m_workingThreads < m_concurrentThreads)
  {
    for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads) && i < m_tasksQueue.size(); ++i)
    {
      m_condMutex->Signal();
    }
  }
}

to

Code
void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
{
  if (!task)
  {
    return;
  }

  wxMutexLocker lock(m_Mutex);

  m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));

  if (!m_batching && m_workingThreads < m_concurrentThreads)
  {
    AwakeNeeded();
  }
}

I personally think that this condition is correct:
Code
    for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads) && i < m_tasksQueue.size(); ++i)
    {
      m_condMutex->Signal();
    }

Look, it both consider two condition:
First condition is:  i < the idle threads numbers (m_concurrentThreads - m_workingThreads)
Second condition is:  i < m_tasksQueue.size()

But why that commit changed those two conditions to one?
If some piece of memory should be reused, turn them to variables (or const variables).
If some piece of operations should be reused, turn them to functions.
If they happened together, then turn them to classes.

Offline ollydbg

  • Developer
  • Lives here!
  • *****
  • Posts: 6077
  • OpenCV and Robotics
    • Chinese OpenCV forum moderator
Re: Please help to review the code in cbThreadPool
« Reply #2 on: June 01, 2014, 11:08:42 am »
All the issues were fixed in trunk now.
If some piece of memory should be reused, turn them to variables (or const variables).
If some piece of operations should be reused, turn them to functions.
If they happened together, then turn them to classes.

Offline oBFusCATed

  • Developer
  • Lives here!
  • *****
  • Posts: 13406
    • Travis build status
Re: Please help to review the code in cbThreadPool
« Reply #3 on: June 01, 2014, 11:47:19 am »
BTW, it is more readable to do
Code
awakeThreadNumber = std::min<size_t>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);
awakeThreadNumber = std::min<TaskQueueType::value_type>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);

instead of:
Code
awakeThreadNumber = std::min(m_tasksQueue.size(), static_cast<std::size_t>(m_concurrentThreads - m_workingThreads));
(most of the time I ignore long posts)
[strangers don't send me private messages, I'll ignore them; post a topic in the forum, but first read the rules!]

Offline ollydbg

  • Developer
  • Lives here!
  • *****
  • Posts: 6077
  • OpenCV and Robotics
    • Chinese OpenCV forum moderator
Re: Please help to review the code in cbThreadPool
« Reply #4 on: June 01, 2014, 12:47:10 pm »
BTW, it is more readable to do
Code
awakeThreadNumber = std::min<size_t>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);
awakeThreadNumber = std::min<TaskQueueType::value_type>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);

instead of:
Code
awakeThreadNumber = std::min(m_tasksQueue.size(), static_cast<std::size_t>(m_concurrentThreads - m_workingThreads));
Thanks, done in rev 9791.
If some piece of memory should be reused, turn them to variables (or const variables).
If some piece of operations should be reused, turn them to functions.
If they happened together, then turn them to classes.