MetriVis | Overview | Download | User Manual | Development |
Reference | Overview | Design Documentation | Reference Backend | Reference Frontend |
00001 /* 00002 * MetriVis - Metrics Visualization Application 00003 * 00004 * 00005 * License notice: 00006 * 00007 * This program is free software: you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation, either version 3 of the License, or 00010 * (at your option) any later version. 00011 * 00012 * This program is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with this program. If not, see <http://www.gnu.org/licenses/>. 00019 * 00020 */ 00021 00022 00029 #ifndef _METRIVIS_REQUEST_HANDLER_H 00030 #define _METRIVIS_REQUEST_HANDLER_H 00031 00032 #include <string> 00033 #include <queue> 00034 #include <stack> 00035 #include <boost/thread/xtime.hpp> 00036 #include <boost/bind.hpp> 00037 00038 #include "BaseClass.h" 00039 #include "RequestDescription.h" 00040 #include "lib/threadpool/threadpool.hpp" 00041 00042 00043 namespace metrivis { 00044 00045 00051 class RequestHandlerBase { 00052 00053 public: 00054 RequestHandlerBase(); 00055 00056 00057 virtual ~RequestHandlerBase(); 00058 00064 virtual void PushRequest(const RequestDescription& request_description); 00065 00066 00072 virtual void Terminate(); 00073 00074 // virtual void 00075 }; 00076 00077 00088 template <class T, int ID = -1> 00089 class RequestHandler : public RequestHandlerBase, public BaseClass { 00090 00091 public: 00092 00101 RequestHandler(int number_of_threads) 00102 : thread_pool_(number_of_threads) { 00103 InitObject("RequestHandler", ID); 00104 } 00105 00106 virtual ~RequestHandler() { 00107 } 00108 00109 00115 virtual void PushRequest(const RequestDescription& request_description) { 00116 Log(LOG_DEBUG, StrPrint("PushRequest for: %s", 00117 request_description.file_name().c_str())); 00118 00119 thread_pool_.PushRequest(request_description); 00120 } 00121 00122 00128 virtual void Terminate() { 00129 thread_pool_.Terminate(); 00130 Log(LOG_DEBUG, "RequestHandler terminated."); 00131 } 00132 00133 00134 private: 00135 int number_of_threads_; 00136 00151 template<class S> 00152 class ThreadPool { 00153 00154 public: 00155 00160 ThreadPool(int number_of_threads) { 00161 pool_.size_controller().resize(number_of_threads); 00162 for(int i = 0; i < number_of_threads; i++) { 00163 dispatchers_.push(new S(i)); 00164 } 00165 // Just created the pool. 00166 post_mortem_ = false; 00167 } 00168 00175 ~ThreadPool() { 00176 // Check if pool is uninitialized cleanly. 00177 if( !post_mortem_) { 00178 printf("ThreadPool::~ThreadPool() was not uninit cleanly. \n"); 00179 } 00180 00181 // Remove all dispatcher objects. 00182 while( !dispatchers_.empty()) { 00183 S* tmp_ref = dispatchers_.top(); 00184 dispatchers_.pop(); 00185 delete tmp_ref; 00186 } 00187 } 00188 00189 00198 void PushRequest(const RequestDescription& request_description) { 00199 request_desriptions_.push(request_description); 00200 00201 // Kick it on. 00202 ScheduleRequests(); 00203 } 00204 00205 00215 void ScheduleRequestsManually() { 00216 ScheduleRequests(); 00217 } 00218 00219 00227 void Terminate() { 00228 // std::queue doesn't know any clear() function. So pop them all off;) 00229 while( !request_desriptions_.empty()) { 00230 request_desriptions_.pop(); 00231 } 00232 // Remove all requests from the schedule. 00233 pool_.clear(); 00234 // Now wait until all running jobs finished. 00235 pool_.wait(); 00236 00237 // This pool has been successfully shut down. 00238 post_mortem_ = true; 00239 } 00240 00241 00242 protected: 00243 00248 const RequestDescription& PopRequest() const { 00249 return request_desriptions_.pop(); 00250 } 00251 00252 00253 private: 00254 00261 void ScheduleRequests() { 00262 pool_.schedule(boost::bind(&ThreadPool<S>::ProcessRequest, this)); 00263 } 00264 00265 00272 void ProcessRequest() { 00273 00274 S* dispatcher; 00275 // Now get a free dispatcher. 00276 { 00277 boost::mutex::scoped_lock lock(dispatcher_mutex_); 00278 // Make sure that there are dispatchers defined. 00279 assert(!dispatchers_.empty()); 00280 // Get the next dispatcher. 00281 dispatcher = dispatchers_.top(); 00282 dispatchers_.pop(); 00283 } 00284 00285 printf("[ThreadPool::ProcessRequest() - processing request]\n"); 00286 00287 // Assign request to worker. 00288 dispatcher->set_request_description(request_desriptions_.front()); 00289 request_desriptions_.pop(); 00290 00291 dispatcher->Init(); 00292 00293 // We do not directly spawn another thread in the dispatcher but execute 00294 // it from the current one. 00295 dispatcher->RunBlocking(); 00296 00297 dispatcher->Uninit(); 00298 00300 // boost::xtime xt; 00301 // boost::xtime_get(&xt, boost::TIME_UTC); 00302 // xt.sec += 4; 00303 // boost::thread::sleep(xt); 00304 00305 // Return the dispatcher after usage. 00306 { 00307 boost::mutex::scoped_lock lock(dispatcher_mutex_); 00308 dispatchers_.push(dispatcher); 00309 } 00310 } 00311 00312 00313 private: 00314 00315 boost::threadpool::pool pool_; 00316 std::queue<RequestDescription> request_desriptions_; 00317 00318 boost::mutex dispatcher_mutex_; 00319 std::stack<S*> dispatchers_; 00320 00321 bool post_mortem_; 00322 00323 }; // class ThreadPool 00324 00325 00326 ThreadPool<T> thread_pool_; 00327 00328 }; // class RequestHandler 00329 00330 00331 00332 } // namespace metrivis 00333 00334 #endif // _METRIVIS_REQUEST_HANDLER_H