Code::Blocks Forums

Developer forums (C::B DEVELOPMENT STRICTLY!) => Development => Topic started by: ollydbg on February 05, 2014, 07:02:57 am

Title: Please help to review the code in cbThreadPool
Post by: ollydbg on February 05, 2014, 07:02:57 am
By reviewing the code, I found two issue:
1. m_taskAdded don't go to false once it was set to true.
Code
// this thread is finishing the task, and is going to be idle.
// if there is no task left, and the total threads is running is 0, then we have all task done
// otherwise, just put me to the idle mode by m_semaphore->Post()
bool cbThreadPool::WaitingThread()
{
  wxMutexLocker lock(m_Mutex);
  --m_workingThreads;

  if (m_workingThreads <= 0 && m_tasksQueue.empty())
  {
    // Sends cbEVT_THREADTASK_ALLDONE message only the real task is all done
    // FIXME (ollydbg#1#): Look at the variable m_taskAdded, it was initialized as false in pool's
    // constructor, and the only chance it was set true is in AddTask() function, I don't find any
    // code to reset this value to false, is that true logic?
    if (m_taskAdded)
    {
      // notify the owner that all tasks are done
      CodeBlocksEvent evt = CodeBlocksEvent(cbEVT_THREADTASK_ALLDONE, m_ID);
      wxPostEvent(m_pOwner, evt);
    }
See the FIXME section of mine.


2, cbThreadPool::AwakeNeeded run a lot of Post() call, but I don't think it is necessary, right?
Code
inline void cbThreadPool::AwakeNeeded()
{
  if (m_concurrentThreads == -1)
    return;
  // TODO (ollydbg#1#): why?
  // I think this can be optimized, for example, the m_concurrentThreads = 3, and the m_tasksQueue.size() == 5
  // m_workingThreads = 2, do we need to call Post() function 5 times? I think 1 is enough.
  // if the m_workingThreads = 0, then I think 3 is enough.
  for (std::size_t i = 0; i < m_tasksQueue.size(); ++i)
    m_semaphore->Post();
}
See the TODO section above. The code was changed in
Code
Revision: cc3a66de85dd9d73b664e298f35523b69f17ce81
Author: ceniza <ceniza@2a5c6006-c6dd-42ca-98ab-0921f2732cef>
Date: 2006-5-21 10:41:21
Message:
* Replaced the condition with a semaphore (just like the previous implementation).

git-svn-id: http://svn.code.sf.net/p/codeblocks/code/trunk@2480 2a5c6006-c6dd-42ca-98ab-0921f2732cef
----
Modified: src/sdk/cbthreadpool.cpp
Modified: src/sdk/cbthreadpool.h



I add many comments to those classes to help understand the logic, see the patch file below.
I would like to see some other devs or users comments on those issues, thanks.

Code
bbfb0cce6e43a2d649c784b8b82158f38d19ded2
 src/include/cbthreadpool.h | 48 +++++++++++++++++++++++++++-------------------
 src/sdk/cbthreadpool.cpp   | 19 ++++++++++++------
 2 files changed, 41 insertions(+), 26 deletions(-)

diff --git a/src/include/cbthreadpool.h b/src/include/cbthreadpool.h
index a06fb52..4a5ae17 100644
--- a/src/include/cbthreadpool.h
+++ b/src/include/cbthreadpool.h
@@ -70,7 +70,7 @@ class DLLIMPORT cbThreadPool
     /** Begin a batch process
       *
       * @note EVIL: Call it if you want to add all tasks first and get none executed yet.
-      * If you DON'T call it, taks will be executed as you add them (in fact it's what
+      * If you DON'T call it, tasks will be executed as you add them (in fact it's what
       * one would expect).
       */
     void BatchBegin();
@@ -101,7 +101,7 @@ class DLLIMPORT cbThreadPool
         T *operator -> () const throw();
 
       private:
-        void dispose();
+        void dispose(); //decrease the counter, and if it get 0, destroy both counter and ptr
     };
 
     /** A Worker Thread class.
@@ -115,7 +115,8 @@ class DLLIMPORT cbThreadPool
         /** cbWorkerThread ctor
           *
           * @param pool Thread Pool this Worker Thread belongs to
-          * @param semaphore Used to synchronise the Worker Threads
+          * @param semaphore Used to synchronize the Worker Threads, it is a reference to the CountedPtr
+          * object
           */
         cbWorkerThread(cbThreadPool *pool, CountedPtr<wxSemaphore> &semaphore);
 
@@ -137,9 +138,10 @@ class DLLIMPORT cbThreadPool
       private:
         bool m_abort;
         cbThreadPool *m_pPool;
+        // a counted semaphore shared with all the cbWorkerThread
         CountedPtr<wxSemaphore> m_semaphore;
         cbThreadedTask *m_pTask;
-        wxMutex m_taskMutex;
+        wxMutex m_taskMutex;// to protect the member variable accessing from multiply threads
     };
 
     typedef std::vector<cbWorkerThread *> WorkerThreadsArray;
@@ -170,26 +172,27 @@ class DLLIMPORT cbThreadPool
 
     typedef std::list<cbThreadedTaskElement> TasksQueue;
 
-    wxEvtHandler *m_pOwner;
-    int m_ID;
-    bool m_batching;
+    wxEvtHandler *m_pOwner; // events notification will send to this guy
+    int m_ID;           // id used to fill the ID field of the event
+    bool m_batching;    // whether in batch mode of adding tasks
 
-    int m_concurrentThreads; // current number of concurrent threads
+    int m_concurrentThreads; // current number of concurrent threads, this is the maximum value of the m_workingThreads
     unsigned int m_stackSize; // stack size for every threads
     int m_concurrentThreadsSchedule; // if we cannot apply the new value of concurrent threads, keep it here
-    WorkerThreadsArray m_threads; // the working threads are stored here
-    TasksQueue m_tasksQueue; // and the pending tasks here
+    WorkerThreadsArray m_threads; // the working threads(cbWorkerThread) are stored here
+    TasksQueue m_tasksQueue; // and the pending tasks (cbThreadedTaskElement) here
     bool m_taskAdded; // true if any task added
 
-    int m_workingThreads; // how many working threads are running a task
+    int m_workingThreads; // how many working threads are running tasks
 
-    mutable wxMutex m_Mutex; // we better be safe
+    mutable wxMutex m_Mutex; // we better be safe, protect the change of member variables
 
-    CountedPtr<wxSemaphore> m_semaphore; // used to synchronise the Worker Threads
+    CountedPtr<wxSemaphore> m_semaphore; // used to synchronize the Worker Threads, the counted value is that
+    // how many threads are sharing this semaphore
 
     void _SetConcurrentThreads(int concurrentThreads); // like SetConcurrentThreads, but non-thread safe
     void Broadcast(); // awakes all threads
-    void AwakeNeeded(); // awakes only a few threads
+    void AwakeNeeded(); // awakes only a few threads // TODO (ollydbg#1#): ?
 
   protected:
     friend class cbWorkerThread;
@@ -200,16 +203,18 @@ class DLLIMPORT cbThreadPool
       */
     cbThreadedTaskElement GetNextTask();
 
-    /// Mechanism for the threads to tell the Pool they're running
+    /// Mechanism for the threads to tell the Pool they're running, a thread is switch from the idle
+    /// mode to working mode. This is triggered by semaphore released somewhere
     void WorkingThread();
 
-    /** Mechanism for the threads to tell the Pool they're done and will wait
+    /** Mechanism for the threads to tell the Pool they're done and will go to idle, so we can assign
+      * another task to this thread.
       *
       * @return true if everything is OK, false if we should abort
       */
     bool WaitingThread();
 
-    /** Called by a Worker Thread to inform a task has finished
+    /** Called by a Worker Thread to inform a single task has finished, this will send a cbEVT_THREADTASK_ENDED event
       *
       * @param thread The Worker Thread
       */
@@ -254,9 +259,9 @@ inline void cbThreadPool::BatchBegin()
 
 inline void cbThreadPool::Broadcast()
 {
-  if (m_concurrentThreads == -1)
+  if (m_concurrentThreads == -1) //// TODO (ollydbg#1#): why need such check?
     return;
-
+  // let the idle(pending) worker thread to execute tasks, those worker threads are waiting for semaphore
   for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads); ++i)
     m_semaphore->Post();
 }
@@ -265,7 +270,10 @@ inline void cbThreadPool::AwakeNeeded()
 {
   if (m_concurrentThreads == -1)
     return;
-
+  // TODO (ollydbg#1#): why?
+  // I think this can be optimized, for example, the m_concurrentThreads = 3, and the m_tasksQueue.size() == 5
+  // m_workingThreads = 2, do we need to call Post() function 5 times? I think 1 is enough.
+  // if the m_workingThreads = 0, then I think 3 is enough.
   for (std::size_t i = 0; i < m_tasksQueue.size(); ++i)
     m_semaphore->Post();
 }
diff --git a/src/sdk/cbthreadpool.cpp b/src/sdk/cbthreadpool.cpp
index 2a4fa1e..b49a991 100644
--- a/src/sdk/cbthreadpool.cpp
+++ b/src/sdk/cbthreadpool.cpp
@@ -47,16 +47,16 @@ void cbThreadPool::SetConcurrentThreads(int concurrentThreads)
   wxMutexLocker lock(m_Mutex);
   _SetConcurrentThreads(concurrentThreads);
 }
-
+// this function is already wrappered by a mutex
 void cbThreadPool::_SetConcurrentThreads(int concurrentThreads)
 {
-  if (!m_workingThreads)
+  if (!m_workingThreads)// if pool is not running (no thread is running)
   {
     std::for_each(m_threads.begin(), m_threads.end(), std::mem_fun(&cbWorkerThread::Abort));
     Broadcast();
     m_threads.clear();
 
-    // set a new Semaphore for the new threads
+    // set a new Semaphore for the new threads, note the max value is the concurrentThreads
     m_semaphore = CountedPtr<wxSemaphore>(new wxSemaphore(0, concurrentThreads));
 
     m_concurrentThreads = concurrentThreads;
@@ -66,7 +66,7 @@ void cbThreadPool::_SetConcurrentThreads(int concurrentThreads)
     {
       m_threads.push_back(new cbWorkerThread(this, m_semaphore));
       m_threads.back()->Create(m_stackSize);
-      m_threads.back()->Run();
+      m_threads.back()->Run(); // this will run cbWorkerThread::Entry()
     }
 
 //    Manager::Get()->GetLogManager()->DebugLog(_T("Concurrent threads for pool set to %d"), m_concurrentThreads);
@@ -85,6 +85,8 @@ void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
   m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));
   m_taskAdded = true;
 
+  // we are in batch mode, so no need to awake the idle thread
+  // m_workingThreads < m_concurrentThreads means there are some threads in idle mode (no task assigned)
   if (!m_batching && m_workingThreads < m_concurrentThreads)
     AwakeNeeded();
 }
@@ -118,13 +120,15 @@ cbThreadPool::cbThreadedTaskElement cbThreadPool::GetNextTask()
 
   return element;
 }
-
+// a thread is leaving from idle mode, and run a task
 void cbThreadPool::WorkingThread()
 {
   wxMutexLocker lock(m_Mutex);
   ++m_workingThreads;
 }
-
+// this thread is finishing the task, and is going to be idle.
+// if there is no task left, and the total threads is running is 0, then we have all task done
+// otherwise, just put me to the idle mode by m_semaphore->Post()
 bool cbThreadPool::WaitingThread()
 {
   wxMutexLocker lock(m_Mutex);
@@ -133,6 +137,9 @@ bool cbThreadPool::WaitingThread()
   if (m_workingThreads <= 0 && m_tasksQueue.empty())
   {
     // Sends cbEVT_THREADTASK_ALLDONE message only the real task is all done
+    // FIXME (ollydbg#1#): Look at the variable m_taskAdded, it was initialized as false in pool's
+    // constructor, and the only chance it was set true is in AddTask() function, I don't find any
+    // code to reset this value to false, is that true logic?
     if (m_taskAdded)
     {
       // notify the owner that all tasks are done


Title: Re: Please help to review the code in cbThreadPool
Post by: ollydbg on February 05, 2014, 07:12:53 am
Look at the change of rev 2480, there is a change that:
Code
void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
{
  if (!task)
  {
    return;
  }

  wxMutexLocker lock(m_Mutex);

  m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));

  if (!m_batching && m_workingThreads < m_concurrentThreads)
  {
    for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads) && i < m_tasksQueue.size(); ++i)
    {
      m_condMutex->Signal();
    }
  }
}

to

Code
void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
{
  if (!task)
  {
    return;
  }

  wxMutexLocker lock(m_Mutex);

  m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));

  if (!m_batching && m_workingThreads < m_concurrentThreads)
  {
    AwakeNeeded();
  }
}

I personally think that this condition is correct:
Code
    for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads) && i < m_tasksQueue.size(); ++i)
    {
      m_condMutex->Signal();
    }

Look, it both consider two condition:
First condition is:  i < the idle threads numbers (m_concurrentThreads - m_workingThreads)
Second condition is:  i < m_tasksQueue.size()

But why that commit changed those two conditions to one?
Title: Re: Please help to review the code in cbThreadPool
Post by: ollydbg on June 01, 2014, 11:08:42 am
All the issues were fixed in trunk now.
Title: Re: Please help to review the code in cbThreadPool
Post by: oBFusCATed on June 01, 2014, 11:47:19 am
BTW, it is more readable to do
Code
awakeThreadNumber = std::min<size_t>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);
awakeThreadNumber = std::min<TaskQueueType::value_type>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);

instead of:
Code
awakeThreadNumber = std::min(m_tasksQueue.size(), static_cast<std::size_t>(m_concurrentThreads - m_workingThreads));
Title: Re: Please help to review the code in cbThreadPool
Post by: ollydbg on June 01, 2014, 12:47:10 pm
BTW, it is more readable to do
Code
awakeThreadNumber = std::min<size_t>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);
awakeThreadNumber = std::min<TaskQueueType::value_type>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);

instead of:
Code
awakeThreadNumber = std::min(m_tasksQueue.size(), static_cast<std::size_t>(m_concurrentThreads - m_workingThreads));
Thanks, done in rev 9791.