By reviewing the code, I found two issue:
1. m_taskAdded don't go to false once it was set to true.
// this thread is finishing the task, and is going to be idle.
// if there is no task left, and the total threads is running is 0, then we have all task done
// otherwise, just put me to the idle mode by m_semaphore->Post()
bool cbThreadPool::WaitingThread()
{
wxMutexLocker lock(m_Mutex);
--m_workingThreads;
if (m_workingThreads <= 0 && m_tasksQueue.empty())
{
// Sends cbEVT_THREADTASK_ALLDONE message only the real task is all done
// FIXME (ollydbg#1#): Look at the variable m_taskAdded, it was initialized as false in pool's
// constructor, and the only chance it was set true is in AddTask() function, I don't find any
// code to reset this value to false, is that true logic?
if (m_taskAdded)
{
// notify the owner that all tasks are done
CodeBlocksEvent evt = CodeBlocksEvent(cbEVT_THREADTASK_ALLDONE, m_ID);
wxPostEvent(m_pOwner, evt);
}
See the FIXME section of mine.
2, cbThreadPool::AwakeNeeded run a lot of Post() call, but I don't think it is necessary, right?
inline void cbThreadPool::AwakeNeeded()
{
if (m_concurrentThreads == -1)
return;
// TODO (ollydbg#1#): why?
// I think this can be optimized, for example, the m_concurrentThreads = 3, and the m_tasksQueue.size() == 5
// m_workingThreads = 2, do we need to call Post() function 5 times? I think 1 is enough.
// if the m_workingThreads = 0, then I think 3 is enough.
for (std::size_t i = 0; i < m_tasksQueue.size(); ++i)
m_semaphore->Post();
}
See the TODO section above. The code was changed in
Revision: cc3a66de85dd9d73b664e298f35523b69f17ce81
Author: ceniza <ceniza@2a5c6006-c6dd-42ca-98ab-0921f2732cef>
Date: 2006-5-21 10:41:21
Message:
* Replaced the condition with a semaphore (just like the previous implementation).
git-svn-id: http://svn.code.sf.net/p/codeblocks/code/trunk@2480 2a5c6006-c6dd-42ca-98ab-0921f2732cef
----
Modified: src/sdk/cbthreadpool.cpp
Modified: src/sdk/cbthreadpool.h
I add many comments to those classes to help understand the logic, see the patch file below.
I would like to see some other devs or users comments on those issues, thanks.
bbfb0cce6e43a2d649c784b8b82158f38d19ded2
src/include/cbthreadpool.h | 48 +++++++++++++++++++++++++++-------------------
src/sdk/cbthreadpool.cpp | 19 ++++++++++++------
2 files changed, 41 insertions(+), 26 deletions(-)
diff --git a/src/include/cbthreadpool.h b/src/include/cbthreadpool.h
index a06fb52..4a5ae17 100644
--- a/src/include/cbthreadpool.h
+++ b/src/include/cbthreadpool.h
@@ -70,7 +70,7 @@ class DLLIMPORT cbThreadPool
/** Begin a batch process
*
* @note EVIL: Call it if you want to add all tasks first and get none executed yet.
- * If you DON'T call it, taks will be executed as you add them (in fact it's what
+ * If you DON'T call it, tasks will be executed as you add them (in fact it's what
* one would expect).
*/
void BatchBegin();
@@ -101,7 +101,7 @@ class DLLIMPORT cbThreadPool
T *operator -> () const throw();
private:
- void dispose();
+ void dispose(); //decrease the counter, and if it get 0, destroy both counter and ptr
};
/** A Worker Thread class.
@@ -115,7 +115,8 @@ class DLLIMPORT cbThreadPool
/** cbWorkerThread ctor
*
* @param pool Thread Pool this Worker Thread belongs to
- * @param semaphore Used to synchronise the Worker Threads
+ * @param semaphore Used to synchronize the Worker Threads, it is a reference to the CountedPtr
+ * object
*/
cbWorkerThread(cbThreadPool *pool, CountedPtr<wxSemaphore> &semaphore);
@@ -137,9 +138,10 @@ class DLLIMPORT cbThreadPool
private:
bool m_abort;
cbThreadPool *m_pPool;
+ // a counted semaphore shared with all the cbWorkerThread
CountedPtr<wxSemaphore> m_semaphore;
cbThreadedTask *m_pTask;
- wxMutex m_taskMutex;
+ wxMutex m_taskMutex;// to protect the member variable accessing from multiply threads
};
typedef std::vector<cbWorkerThread *> WorkerThreadsArray;
@@ -170,26 +172,27 @@ class DLLIMPORT cbThreadPool
typedef std::list<cbThreadedTaskElement> TasksQueue;
- wxEvtHandler *m_pOwner;
- int m_ID;
- bool m_batching;
+ wxEvtHandler *m_pOwner; // events notification will send to this guy
+ int m_ID; // id used to fill the ID field of the event
+ bool m_batching; // whether in batch mode of adding tasks
- int m_concurrentThreads; // current number of concurrent threads
+ int m_concurrentThreads; // current number of concurrent threads, this is the maximum value of the m_workingThreads
unsigned int m_stackSize; // stack size for every threads
int m_concurrentThreadsSchedule; // if we cannot apply the new value of concurrent threads, keep it here
- WorkerThreadsArray m_threads; // the working threads are stored here
- TasksQueue m_tasksQueue; // and the pending tasks here
+ WorkerThreadsArray m_threads; // the working threads(cbWorkerThread) are stored here
+ TasksQueue m_tasksQueue; // and the pending tasks (cbThreadedTaskElement) here
bool m_taskAdded; // true if any task added
- int m_workingThreads; // how many working threads are running a task
+ int m_workingThreads; // how many working threads are running tasks
- mutable wxMutex m_Mutex; // we better be safe
+ mutable wxMutex m_Mutex; // we better be safe, protect the change of member variables
- CountedPtr<wxSemaphore> m_semaphore; // used to synchronise the Worker Threads
+ CountedPtr<wxSemaphore> m_semaphore; // used to synchronize the Worker Threads, the counted value is that
+ // how many threads are sharing this semaphore
void _SetConcurrentThreads(int concurrentThreads); // like SetConcurrentThreads, but non-thread safe
void Broadcast(); // awakes all threads
- void AwakeNeeded(); // awakes only a few threads
+ void AwakeNeeded(); // awakes only a few threads // TODO (ollydbg#1#): ?
protected:
friend class cbWorkerThread;
@@ -200,16 +203,18 @@ class DLLIMPORT cbThreadPool
*/
cbThreadedTaskElement GetNextTask();
- /// Mechanism for the threads to tell the Pool they're running
+ /// Mechanism for the threads to tell the Pool they're running, a thread is switch from the idle
+ /// mode to working mode. This is triggered by semaphore released somewhere
void WorkingThread();
- /** Mechanism for the threads to tell the Pool they're done and will wait
+ /** Mechanism for the threads to tell the Pool they're done and will go to idle, so we can assign
+ * another task to this thread.
*
* @return true if everything is OK, false if we should abort
*/
bool WaitingThread();
- /** Called by a Worker Thread to inform a task has finished
+ /** Called by a Worker Thread to inform a single task has finished, this will send a cbEVT_THREADTASK_ENDED event
*
* @param thread The Worker Thread
*/
@@ -254,9 +259,9 @@ inline void cbThreadPool::BatchBegin()
inline void cbThreadPool::Broadcast()
{
- if (m_concurrentThreads == -1)
+ if (m_concurrentThreads == -1) //// TODO (ollydbg#1#): why need such check?
return;
-
+ // let the idle(pending) worker thread to execute tasks, those worker threads are waiting for semaphore
for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads); ++i)
m_semaphore->Post();
}
@@ -265,7 +270,10 @@ inline void cbThreadPool::AwakeNeeded()
{
if (m_concurrentThreads == -1)
return;
-
+ // TODO (ollydbg#1#): why?
+ // I think this can be optimized, for example, the m_concurrentThreads = 3, and the m_tasksQueue.size() == 5
+ // m_workingThreads = 2, do we need to call Post() function 5 times? I think 1 is enough.
+ // if the m_workingThreads = 0, then I think 3 is enough.
for (std::size_t i = 0; i < m_tasksQueue.size(); ++i)
m_semaphore->Post();
}
diff --git a/src/sdk/cbthreadpool.cpp b/src/sdk/cbthreadpool.cpp
index 2a4fa1e..b49a991 100644
--- a/src/sdk/cbthreadpool.cpp
+++ b/src/sdk/cbthreadpool.cpp
@@ -47,16 +47,16 @@ void cbThreadPool::SetConcurrentThreads(int concurrentThreads)
wxMutexLocker lock(m_Mutex);
_SetConcurrentThreads(concurrentThreads);
}
-
+// this function is already wrappered by a mutex
void cbThreadPool::_SetConcurrentThreads(int concurrentThreads)
{
- if (!m_workingThreads)
+ if (!m_workingThreads)// if pool is not running (no thread is running)
{
std::for_each(m_threads.begin(), m_threads.end(), std::mem_fun(&cbWorkerThread::Abort));
Broadcast();
m_threads.clear();
- // set a new Semaphore for the new threads
+ // set a new Semaphore for the new threads, note the max value is the concurrentThreads
m_semaphore = CountedPtr<wxSemaphore>(new wxSemaphore(0, concurrentThreads));
m_concurrentThreads = concurrentThreads;
@@ -66,7 +66,7 @@ void cbThreadPool::_SetConcurrentThreads(int concurrentThreads)
{
m_threads.push_back(new cbWorkerThread(this, m_semaphore));
m_threads.back()->Create(m_stackSize);
- m_threads.back()->Run();
+ m_threads.back()->Run(); // this will run cbWorkerThread::Entry()
}
// Manager::Get()->GetLogManager()->DebugLog(_T("Concurrent threads for pool set to %d"), m_concurrentThreads);
@@ -85,6 +85,8 @@ void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));
m_taskAdded = true;
+ // we are in batch mode, so no need to awake the idle thread
+ // m_workingThreads < m_concurrentThreads means there are some threads in idle mode (no task assigned)
if (!m_batching && m_workingThreads < m_concurrentThreads)
AwakeNeeded();
}
@@ -118,13 +120,15 @@ cbThreadPool::cbThreadedTaskElement cbThreadPool::GetNextTask()
return element;
}
-
+// a thread is leaving from idle mode, and run a task
void cbThreadPool::WorkingThread()
{
wxMutexLocker lock(m_Mutex);
++m_workingThreads;
}
-
+// this thread is finishing the task, and is going to be idle.
+// if there is no task left, and the total threads is running is 0, then we have all task done
+// otherwise, just put me to the idle mode by m_semaphore->Post()
bool cbThreadPool::WaitingThread()
{
wxMutexLocker lock(m_Mutex);
@@ -133,6 +137,9 @@ bool cbThreadPool::WaitingThread()
if (m_workingThreads <= 0 && m_tasksQueue.empty())
{
// Sends cbEVT_THREADTASK_ALLDONE message only the real task is all done
+ // FIXME (ollydbg#1#): Look at the variable m_taskAdded, it was initialized as false in pool's
+ // constructor, and the only chance it was set true is in AddTask() function, I don't find any
+ // code to reset this value to false, is that true logic?
if (m_taskAdded)
{
// notify the owner that all tasks are done