00001 /*************************************************************************** 00002 * Copyright (C) 2004-2006 by Ilya A. Volynets-Evenbakh * 00003 * ilya@total-knowledge.com * 00004 * * 00005 * This program is free software; you can redistribute it and/or modify * 00006 * it under the terms of the GNU General Public License as published by * 00007 * the Free Software Foundation; either version 2 of the License, or * 00008 * (at your option) any later version. * 00009 * * 00010 * This program is distributed in the hope that it will be useful, * 00011 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 00013 * GNU General Public License for more details. * 00014 * * 00015 * You should have received a copy of the GNU General Public License * 00016 * along with this program; if not, write to the * 00017 * Free Software Foundation, Inc., * 00018 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * 00019 ***************************************************************************/ 00020 #include "cthreadpool.h" 00021 #include <iostream> 00022 #include <sptk3/CGuard.h> 00023 00024 namespace container { 00025 00026 CThreadPool::CThreadPool(int numThreads,int queueLen) 00027 : m_maxQueueLen(queueLen) 00028 , m_curQueueLen(0) 00029 , m_running(true) 00030 { 00031 sptk::CGuard guard(m_tlLock); 00032 for(int i=0;i<numThreads;i++){ 00033 Thread* t=new Thread(this); 00034 m_threads.push_front(t); 00035 t->run(); 00036 } 00037 } 00038 00039 00040 CThreadPool::~CThreadPool() 00041 { 00042 stop(); 00043 } 00044 00045 00046 void CThreadPool::queue(CThreadPool::Task* t) 00047 { 00048 sptk::CGuard guard(m_queueLock); 00049 while(m_curQueueLen>=m_maxQueueLen) { 00050 m_queueLock.waitForSignal(); 00051 } 00052 m_queue.push_front(t); 00053 m_curQueueLen++; 00054 #ifdef DEBUG_TP 00055 std::cerr<<__PRETTY_FUNCTION__<<": queue length: "<<m_curQueueLen<<std::endl; 00056 #endif 00057 m_queueLock.broadcastSignalNoLock(); 00058 } 00059 00060 CThreadPool::Task* CThreadPool::getTask() 00061 { 00062 sptk::CGuard guard(m_queueLock); 00063 while(m_curQueueLen==0 && m_running) { //Empty 00064 m_queueLock.waitForSignal(); 00065 } 00066 CThreadPool::Task* t=0; 00067 if(m_running) { 00068 tasklist_t::iterator i=m_queue.end(); 00069 i--;// Presume if curQueueLen !=0, we have some elements... 00070 t=*i; 00071 m_queue.erase(i); 00072 m_curQueueLen--; 00073 } 00074 #ifdef DEBUG_TP 00075 std::cerr<<__PRETTY_FUNCTION__<<": queue length: "<<m_curQueueLen<<std::endl; 00076 #endif 00077 m_queueLock.broadcastSignalNoLock(); 00078 return t; 00079 } 00080 00086 void CThreadPool::Thread::threadFunction() 00087 { 00088 CThreadPool::Task* t; 00089 m_running=true; 00090 do { 00091 #ifdef DEBUG_TP 00092 std::cerr<<__PRETTY_FUNCTION__<<": thread "<<(void*)this<<" is waiting for a task"<<std::endl; 00093 #endif 00094 t=m_pool->getTask(); 00095 #ifdef DEBUG_TP 00096 std::cerr<<__PRETTY_FUNCTION__<<": thread "<<(void*)this<<" got task "<<(void*)t<<std::endl; 00097 #endif 00098 if(t) { 00099 t->run(); 00100 delete t; 00101 } 00102 #ifdef DEBUG_TP 00103 std::cerr<<__PRETTY_FUNCTION__<<": thread "<<(void*)this<<" finished task "<<(void*)t<<std::endl; 00104 #endif 00105 } while(t); 00106 } 00107 00108 } 00109 00110 00116 void container::CThreadPool::stop() 00117 { 00118 sptk::CGuard guard(m_tlLock); 00119 m_queueLock.lock(); 00120 m_running=false; 00121 m_queueLock.unlock(); 00122 m_queueLock.broadcastSignalNoLock(); 00123 do { 00124 m_tlLock.waitForSignal(); 00125 } while(!m_threads.empty()); 00126 } 00127 00128 00132 void container::CThreadPool::threadDone(CThreadPool::Thread* thread) 00133 { 00134 sptk::CGuard guard(m_tlLock); 00135 for(threadlist_t::iterator it=m_threads.begin(); 00136 it!=m_threads.end(); it++) 00137 { 00138 if(*it==thread) { 00139 //delete *it; 00140 m_threads.erase(it); 00141 break; 00142 } 00143 } 00144 m_tlLock.sendSignalNoLock(); 00145 } 00146 00147 00151 void container::CThreadPool::Thread::onThreadExit() 00152 { 00153 m_pool->threadDone(this); 00154 delete this; 00155 }