Package gofer :: Package rmi :: Module threadpool
[hide private]
[frames] | no frames]

Source Code for Module gofer.rmi.threadpool

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16  """ 
 17  Thread Pool classes. 
 18  """ 
 19   
 20  from uuid import uuid4 
 21  from threading import Thread, RLock, Condition 
 22  from Queue import Queue, Empty 
 23  from logging import getLogger 
 24   
 25  from gofer import synchronized, conditional 
 26   
 27   
 28  log = getLogger(__name__) 
29 30 31 -class Worker(Thread):
32 """ 33 Pool (worker) thread. 34 @ivar pool: A thread pool. 35 @type pool: L{ThreadPool} 36 """ 37
38 - def __init__(self, id, pool):
39 """ 40 @param id: The worker id in the pool. 41 @type id: int 42 @param pool: A thread pool. 43 @type pool: L{ThreadPool} 44 """ 45 name = 'worker-%d' % id 46 Thread.__init__(self, name=name) 47 self.id = id 48 self.pool = pool 49 self.setDaemon(True)
50
51 - def run(self):
52 """ 53 Main run loop; processes input queue. 54 """ 55 while True: 56 call = self.pool.pending() 57 if not call: 58 # exit requested 59 self.pool = None 60 return 61 try: 62 call(self.pool) 63 except Exception: 64 log.exception(str(call))
65
66 67 -class Call:
68 """ 69 A call to be executed by the thread pool. 70 @ivar id: The unique call ID. 71 @type id: str 72 @ivar fn: The function/method to be executed. 73 @type fn: callable 74 @ivar args: The list of args passed to the callable. 75 @type args: list 76 @ivar kwargs: The list of keyword args passed to the callable. 77 @type kwargs: dict 78 """ 79
80 - def __init__(self, id, fn, args=None, kwargs=None):
81 """ 82 @param id: The unique call ID. 83 @type id: str 84 @param fn: The function/method to be executed. 85 @type fn: callable 86 @param args: The list of args passed to the callable. 87 @type args: tuple 88 @param kwargs: The list of keyword args passed to the callable. 89 @type kwargs: dict 90 """ 91 self.id = id 92 self.fn = fn 93 self.args = args or [] 94 self.kwargs = kwargs or {} 95 self.retval = None
96
97 - def __call__(self, pool):
98 """ 99 Execute the call. 100 @param pool: A thread pool. 101 @type pool: L{ThreadPool} 102 """ 103 try: 104 self.retval = self.fn(*self.args, **self.kwargs) 105 except Exception, e: 106 self.retval = e 107 pool.completed(self)
108
109 - def __str__(self):
110 s = ['call: '] 111 s.append('%s = ' % self.retval) 112 s.append(str(self.fn)) 113 s.append('(') 114 s.append(str(self.args)) 115 s.append(', ') 116 s.append(str(self.kwargs)) 117 s.append(')') 118 return ''.join(s)
119
120 121 -class IndexedQueue:
122 """ 123 Synchronized call queue with indexed search. 124 @ivar __condition: A condition used to synchronize the queue. 125 @type __condition: L{Condition} 126 @ivar __list: Provides fifo functionality. 127 @type __list: list 128 @ivar __dict: Provides indexed access. 129 @type __dict: dict 130 """ 131
132 - def __init__(self):
133 self.__condition = Condition() 134 self.__list = [] 135 self.__dict = {}
136 137 @conditional
138 - def put(self, call):
139 """ 140 Put a call and retval in the queue. 141 Signals threads waiting on the condition using get() or find(). 142 @param call: A call to enqueue. 143 @type call: L{Call} 144 """ 145 self.__list.insert(0, call) 146 self.__dict[call.id] = call 147 self.__notify()
148 149 @conditional
150 - def get(self, blocking=True, timeout=None):
151 """ 152 Read the next available call. 153 @param blocking: Block and wait when the queue is empty. 154 @type blocking: bool 155 @param timeout: The time to wait when the queue is empty. 156 @type timeout: int 157 @return: The next completed call. 158 @rtype: L{call} 159 """ 160 waited = False 161 while True: 162 if self.__list: 163 call = self.__list.pop() 164 self.__dict.pop(call.id) 165 return call 166 else: 167 if blocking: 168 if waited: 169 raise Empty() 170 self.__wait(timeout) 171 waited = True 172 else: 173 raise Empty()
174 175 @conditional
176 - def find(self, id, blocking=True, timeout=None):
177 """ 178 Find a call result by ID. 179 @param id: A call ID. 180 @type id: str 181 @param blocking: Block and wait when the queue is empty. 182 @type blocking: bool 183 @param timeout: The time to wait when the queue is empty. 184 @type timeout: int 185 @return: A completed call by call ID. 186 @rtype: L{call} 187 """ 188 waited = False 189 while True: 190 if self.__dict.has_key(id): 191 call = self.__dict.pop(id) 192 self.__list.remove(call) 193 return call 194 else: 195 if blocking: 196 if waited: 197 raise Empty() 198 self.__wait(timeout) 199 waited = True 200 else: 201 raise Empty()
202
203 - def __notify(self):
204 self.__condition.notify_all()
205
206 - def __wait(self, timeout):
207 self.__condition.wait(timeout)
208
209 - def __lock(self):
210 self.__condition.acquire()
211
212 - def __unlock(self):
213 self.__condition.release()
214 215 @conditional
216 - def __len__(self):
217 return len(self.__list)
218
219 220 -class ThreadPool:
221 """ 222 A load distributed thread pool. 223 @ivar min: The min # of workers. 224 @type min: int 225 @ivar max: The max # of workers. 226 @type max: int 227 @ivar __pending: The worker pending queue. 228 @type __pending: L{Queue} 229 @ivar __threads: The list of workers 230 @type __threads: list 231 @ivar __tracking: The job tracking dict(s) (<pending>,<completed>) 232 @type __tracking: tuple(2) 233 @ivar __mutex: The pool mutex. 234 @type __mutex: RLock 235 """ 236
237 - def __init__(self, min=1, max=1, duplex=True):
238 """ 239 @param min: The min # of workers. 240 @type min: int 241 @param max: The max # of workers. 242 @type max: int 243 @param duplex: Indicates that the pool supports 244 bidirectional communication. That is, call 245 results are queued. (default: True). 246 @type duplex: bool 247 """ 248 assert(min > 0) 249 assert(max >= min) 250 self.min = min 251 self.max = max 252 self.duplex = duplex 253 self.__mutex = RLock() 254 self.__pending = Queue() 255 self.__threads = [] 256 self.__tracking = ({}, IndexedQueue()) 257 for x in range(0, min): 258 self.__add()
259
260 - def run(self, fn, *args, **kwargs):
261 """ 262 Schedule a call. 263 Convenience method for scheduling. 264 @param fn: A function/method to execute. 265 @type fn: callable 266 @param args: The args passed to fn() 267 @type args: tuple 268 @param kwargs: The keyword args passed fn() 269 @type kwargs: dict 270 @return The call ID. 271 @rtype str 272 """ 273 id = uuid4() 274 call = Call(id, fn, args, kwargs) 275 return self.schedule(call)
276
277 - def schedule(self, call):
278 """ 279 Schedule a call. 280 @param call: A call to schedule for execution. 281 @param call: L{Call} 282 @return: The call ID. 283 @rtype: str 284 """ 285 self.__expand() 286 self.__track(call) 287 self.__pending.put(call) 288 return call.id
289
290 - def get(self, blocking=True, timeout=None):
291 """ 292 Get the results of I{calls} executed in the pool. 293 @param id: A job ID. 294 @type id: str 295 @param blocking: Block when queue is empty. 296 @type blocking: bool 297 @param timeout: The time (seconds) to block when empty. 298 @return: Call result (call, retval) 299 @rtype: tuple(2) 300 """ 301 return self.__tracking[1].get(blocking, timeout)
302
303 - def find(self, id=None, blocking=True, timeout=None):
304 """ 305 Find the results of I{calls} executed in the pool by job ID. 306 @param id: A job ID. 307 @type id: str 308 @param blocking: Block when queue is empty. 309 @type blocking: bool 310 @param timeout: The time (seconds) to block when empty. 311 @return: Call return value 312 @rtype: object 313 """ 314 return self.__tracking[1].find(id, blocking, timeout)
315
316 - def pending(self):
317 """ 318 Used by worker to get the next pending call. 319 @return: The next pending call. 320 @rtype L{Call} 321 """ 322 return self.__pending.get()
323
324 - def completed(self, call):
325 """ 326 Result received. 327 @param call: A call object. 328 @type call: Call 329 """ 330 if self.duplex: 331 self.__tracking[1].put(call) 332 self.__lock() 333 try: 334 self.__tracking[0].pop(call.id) 335 finally: 336 self.__unlock()
337
338 - def info(self):
339 """ 340 Get pool statistics. 341 @return: pool statistics 342 - capacity: number of allocated threads 343 - running: number of calls currently queued. 344 - completed: number of calls completed but return has not been 345 consumed using get() or find(). 346 @rtype: dict 347 """ 348 pending = self.__pending.qsize() 349 self.__lock() 350 try: 351 return dict( 352 capacity=len(self), 353 pending=pending, 354 running=len(self.__tracking[0]), 355 completed=len(self.__tracking[1]) 356 ) 357 finally: 358 self.__unlock()
359
360 - def shutdown(self):
361 """ 362 Shutdown the pool. 363 Terminate and join all workers. 364 """ 365 # send stop request 366 for n in range(0, len(self)): 367 self.__pending.put(0) 368 self.__lock() 369 # safely copy list of threads 370 try: 371 threads = self.__threads[:] 372 self.__threads = [] 373 finally: 374 self.__unlock() 375 # join stopped threads 376 for t in threads: 377 t.join()
378 379 @synchronized
380 - def __track(self, call):
381 """ 382 Call has been scheduled. 383 @param call: A call (id, fn, args, kwargs). 384 @type call: tuple(4) 385 """ 386 self.__tracking[0][call.id] = call
387 388 @synchronized
389 - def __add(self):
390 """ 391 Add a thread to the pool. 392 """ 393 total = len(self.__threads) 394 if total < self.max: 395 thread = Worker(total, self) 396 self.__threads.append(thread) 397 thread.start()
398 399 @synchronized
400 - def __expand(self):
401 """ 402 Expand the worker pool based on needed capacity. 403 """ 404 total = len(self.__threads) 405 queued = len(self.__tracking[0]) 406 capacity = (total-queued) 407 if capacity < 1: 408 self.__add()
409
410 - def __lock(self):
411 self.__mutex.acquire()
412
413 - def __unlock(self):
414 self.__mutex.release()
415 416 @synchronized
417 - def __len__(self):
418 return len(self.__threads)
419
420 - def __repr__(self):
421 return 'pool: %s' % self.info()
422
423 424 -class Immediate:
425 """ 426 Run (immediate) pool. 427 """ 428
429 - def run(self, fn, *args, **options):
430 """ 431 Run request. 432 @param fn: A function/method to execute. 433 @type fn: callable 434 @param args: The args passed to fn() 435 @type args: list 436 @param options: The keyword args passed fn() 437 @type options: dict 438 """ 439 fn(*args, **options) 440 return 0
441