CPPSERV


Home Projects Jobs Clientele Contact
CPPSERV Documentation Download TODO Mailing lists Bug tracker News RSS Feed Browse source

cthreadpool.cpp

Go to the documentation of this file.
00001 /***************************************************************************
00002  *   Copyright (C) 2004-2006 by Ilya A. Volynets-Evenbakh                  *
00003  *   ilya@total-knowledge.com                                              *
00004  *                                                                         *
00005  *   This program is free software; you can redistribute it and/or modify  *
00006  *   it under the terms of the GNU General Public License as published by  *
00007  *   the Free Software Foundation; either version 2 of the License, or     *
00008  *   (at your option) any later version.                                   *
00009  *                                                                         *
00010  *   This program is distributed in the hope that it will be useful,       *
00011  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00012  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
00013  *   GNU General Public License for more details.                          *
00014  *                                                                         *
00015  *   You should have received a copy of the GNU General Public License     *
00016  *   along with this program; if not, write to the                         *
00017  *   Free Software Foundation, Inc.,                                       *
00018  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
00019  ***************************************************************************/
00020 #include "cthreadpool.h"
00021 #include <iostream>
00022 #include <sptk3/CGuard.h>
00023 
00024 namespace container {
00025 
00026 CThreadPool::CThreadPool(int numThreads,int queueLen)
00027     : m_maxQueueLen(queueLen)
00028     , m_curQueueLen(0)
00029     , m_running(true)
00030 {
00031     sptk::CGuard guard(m_tlLock);
00032     for(int i=0;i<numThreads;i++){
00033         Thread* t=new Thread(this);
00034         m_threads.push_front(t);
00035         t->run();
00036     }
00037 }
00038 
00039 
00040 CThreadPool::~CThreadPool()
00041 {
00042     stop();
00043 }
00044 
00045 
00046 void CThreadPool::queue(CThreadPool::Task* t)
00047 {
00048     sptk::CGuard guard(m_queueLock);
00049     while(m_curQueueLen>=m_maxQueueLen) {
00050         m_queueLock.waitForSignal();
00051     }
00052     m_queue.push_front(t);
00053     m_curQueueLen++;
00054 #ifdef DEBUG_TP
00055     std::cerr<<__PRETTY_FUNCTION__<<": queue length: "<<m_curQueueLen<<std::endl;
00056 #endif
00057     m_queueLock.broadcastSignalNoLock();
00058 }
00059 
00060 CThreadPool::Task* CThreadPool::getTask()
00061 {
00062     sptk::CGuard guard(m_queueLock);
00063     while(m_curQueueLen==0 && m_running) { //Empty
00064         m_queueLock.waitForSignal();
00065     }
00066     CThreadPool::Task* t=0;
00067     if(m_running) {
00068         tasklist_t::iterator i=m_queue.end();
00069         i--;// Presume if curQueueLen !=0, we have some elements...
00070         t=*i;
00071         m_queue.erase(i);
00072         m_curQueueLen--;
00073     }
00074 #ifdef DEBUG_TP
00075     std::cerr<<__PRETTY_FUNCTION__<<": queue length: "<<m_curQueueLen<<std::endl;
00076 #endif
00077     m_queueLock.broadcastSignalNoLock();
00078     return t;
00079 }
00080 
00086 void CThreadPool::Thread::threadFunction()
00087 {
00088     CThreadPool::Task* t;
00089     m_running=true;
00090     do {
00091 #ifdef DEBUG_TP
00092         std::cerr<<__PRETTY_FUNCTION__<<": thread "<<(void*)this<<" is waiting for a task"<<std::endl;
00093 #endif
00094         t=m_pool->getTask();
00095 #ifdef DEBUG_TP
00096         std::cerr<<__PRETTY_FUNCTION__<<": thread "<<(void*)this<<" got task "<<(void*)t<<std::endl;
00097 #endif
00098         if(t) {
00099             t->run();
00100             delete t;
00101         }
00102 #ifdef DEBUG_TP
00103         std::cerr<<__PRETTY_FUNCTION__<<": thread "<<(void*)this<<" finished task "<<(void*)t<<std::endl;
00104 #endif
00105     } while(t);
00106 }
00107 
00108 }
00109 
00110 
00116 void container::CThreadPool::stop()
00117 {
00118     sptk::CGuard guard(m_tlLock);
00119     m_queueLock.lock();
00120     m_running=false;
00121     m_queueLock.unlock();
00122     m_queueLock.broadcastSignalNoLock();
00123     do {
00124         m_tlLock.waitForSignal();
00125     } while(!m_threads.empty());
00126 }
00127 
00128 
00132 void container::CThreadPool::threadDone(CThreadPool::Thread* thread)
00133 {
00134     sptk::CGuard guard(m_tlLock);
00135     for(threadlist_t::iterator it=m_threads.begin();
00136         it!=m_threads.end(); it++)
00137     {
00138         if(*it==thread) {
00139             //delete *it;
00140             m_threads.erase(it);
00141             break;
00142         }
00143     }
00144     m_tlLock.sendSignalNoLock();
00145 }
00146 
00147 
00151 void container::CThreadPool::Thread::onThreadExit()
00152 {
00153     m_pool->threadDone(this);
00154     delete this;
00155 }

SourceForge.net Logo