1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16  """ 
 17  Thread Pool classes. 
 18  """ 
 19   
 20  from uuid import uuid4 
 21  from threading import Thread, RLock, Condition 
 22  from Queue import Queue, Empty 
 23  from logging import getLogger 
 24   
 25  from gofer import synchronized, conditional 
 26   
 27   
 28  log = getLogger(__name__) 
 32      """ 
 33      Pool (worker) thread. 
 34      @ivar pool: A thread pool. 
 35      @type pool: L{ThreadPool} 
 36      """ 
 37       
 39          """ 
 40          @param id: The worker id in the pool. 
 41          @type id: int 
 42          @param pool: A thread pool. 
 43          @type pool: L{ThreadPool} 
 44          """ 
 45          name = 'worker-%d' % id 
 46          Thread.__init__(self, name=name) 
 47          self.id = id 
 48          self.pool = pool 
 49          self.setDaemon(True) 
  50           
 52          """ 
 53          Main run loop; processes input queue. 
 54          """ 
 55          while True: 
 56              call = self.pool.pending() 
 57              if not call: 
 58                   
 59                  self.pool = None 
 60                  return 
 61              try: 
 62                  call(self.pool) 
 63              except Exception: 
 64                  log.exception(str(call)) 
   65   
 68      """ 
 69      A call to be executed by the thread pool. 
 70      @ivar id: The unique call ID. 
 71      @type id: str 
 72      @ivar fn: The function/method to be executed. 
 73      @type fn: callable 
 74      @ivar args: The list of args passed to the callable. 
 75      @type args: list 
 76      @ivar kwargs: The list of keyword args passed to the callable. 
 77      @type kwargs: dict 
 78      """ 
 79   
 80 -    def __init__(self, id, fn, args=None, kwargs=None): 
  81          """ 
 82          @param id: The unique call ID. 
 83          @type id: str 
 84          @param fn: The function/method to be executed. 
 85          @type fn: callable 
 86          @param args: The list of args passed to the callable. 
 87          @type args: tuple 
 88          @param kwargs: The list of keyword args passed to the callable. 
 89          @type kwargs: dict 
 90          """ 
 91          self.id = id 
 92          self.fn = fn 
 93          self.args = args or [] 
 94          self.kwargs = kwargs or {} 
 95          self.retval = None 
  96   
 98          """ 
 99          Execute the call. 
100          @param pool: A thread pool. 
101          @type pool: L{ThreadPool} 
102          """ 
103          try: 
104              self.retval = self.fn(*self.args, **self.kwargs) 
105          except Exception, e: 
106              self.retval = e 
107          pool.completed(self) 
 108   
110          s = ['call: '] 
111          s.append('%s = ' % self.retval) 
112          s.append(str(self.fn)) 
113          s.append('(') 
114          s.append(str(self.args)) 
115          s.append(', ') 
116          s.append(str(self.kwargs)) 
117          s.append(')') 
118          return ''.join(s) 
  119   
122      """ 
123      Synchronized call queue with indexed search. 
124      @ivar __condition: A condition used to synchronize the queue. 
125      @type __condition: L{Condition} 
126      @ivar __list: Provides fifo functionality. 
127      @type __list: list 
128      @ivar __dict: Provides indexed access. 
129      @type __dict: dict 
130      """ 
131   
133          self.__condition = Condition() 
134          self.__list = [] 
135          self.__dict = {} 
 136   
137      @conditional 
138 -    def put(self, call): 
 139          """ 
140          Put a call and retval in the queue. 
141          Signals threads waiting on the condition using get() or find(). 
142          @param call: A call to enqueue. 
143          @type call: L{Call} 
144          """ 
145          self.__list.insert(0, call) 
146          self.__dict[call.id] = call 
147          self.__notify() 
 148   
149      @conditional 
150 -    def get(self, blocking=True, timeout=None): 
 151          """ 
152          Read the next available call. 
153          @param blocking: Block and wait when the queue is empty. 
154          @type blocking: bool 
155          @param timeout: The time to wait when the queue is empty. 
156          @type timeout: int 
157          @return: The next completed call. 
158          @rtype: L{call} 
159          """ 
160          waited = False 
161          while True: 
162              if self.__list: 
163                  call = self.__list.pop() 
164                  self.__dict.pop(call.id) 
165                  return call 
166              else: 
167                  if blocking: 
168                      if waited: 
169                          raise Empty() 
170                      self.__wait(timeout) 
171                      waited = True 
172                  else: 
173                      raise Empty() 
 174   
175      @conditional 
176 -    def find(self, id, blocking=True, timeout=None): 
 177          """ 
178          Find a call result by ID. 
179          @param id: A call ID. 
180          @type id: str 
181          @param blocking: Block and wait when the queue is empty. 
182          @type blocking: bool 
183          @param timeout: The time to wait when the queue is empty. 
184          @type timeout: int 
185          @return: A completed call by call ID. 
186          @rtype: L{call} 
187          """ 
188          waited = False 
189          while True: 
190              if self.__dict.has_key(id): 
191                  call = self.__dict.pop(id) 
192                  self.__list.remove(call) 
193                  return call 
194              else: 
195                  if blocking: 
196                      if waited: 
197                          raise Empty() 
198                      self.__wait(timeout) 
199                      waited = True 
200                  else: 
201                      raise Empty() 
 202   
204          self.__condition.notify_all() 
 205   
208   
211   
214   
215      @conditional 
 218   
221      """ 
222      A load distributed thread pool. 
223      @ivar min: The min # of workers. 
224      @type min: int 
225      @ivar max: The max # of workers. 
226      @type max: int 
227      @ivar __pending: The worker pending queue. 
228      @type __pending: L{Queue} 
229      @ivar __threads: The list of workers 
230      @type __threads: list 
231      @ivar __tracking: The job tracking dict(s) (<pending>,<completed>) 
232      @type __tracking: tuple(2) 
233      @ivar __mutex: The pool mutex. 
234      @type __mutex: RLock 
235      """ 
236   
237 -    def __init__(self, min=1, max=1, duplex=True): 
 238          """ 
239          @param min: The min # of workers. 
240          @type min: int 
241          @param max: The max # of workers. 
242          @type max: int 
243          @param duplex: Indicates that the pool supports 
244              bidirectional communication.  That is, call 
245              results are queued. (default: True). 
246          @type duplex: bool 
247          """ 
248          assert(min > 0) 
249          assert(max >= min) 
250          self.min = min 
251          self.max = max 
252          self.duplex = duplex 
253          self.__mutex = RLock() 
254          self.__pending = Queue() 
255          self.__threads = [] 
256          self.__tracking = ({}, IndexedQueue()) 
257          for x in range(0, min): 
258              self.__add() 
 259           
260 -    def run(self, fn, *args, **kwargs): 
 261          """ 
262          Schedule a call. 
263          Convenience method for scheduling. 
264          @param fn: A function/method to execute. 
265          @type fn: callable 
266          @param args: The args passed to fn() 
267          @type args: tuple 
268          @param kwargs: The keyword args passed fn() 
269          @type kwargs: dict 
270          @return The call ID. 
271          @rtype str 
272          """ 
273          id = uuid4() 
274          call = Call(id, fn, args, kwargs) 
275          return self.schedule(call) 
 276   
278          """ 
279          Schedule a call. 
280          @param call: A call to schedule for execution. 
281          @param call: L{Call} 
282          @return: The call ID. 
283          @rtype: str 
284          """ 
285          self.__expand() 
286          self.__track(call) 
287          self.__pending.put(call) 
288          return call.id 
 289   
290 -    def get(self, blocking=True, timeout=None): 
 291          """ 
292          Get the results of I{calls} executed in the pool. 
293          @param id: A job ID. 
294          @type id: str 
295          @param blocking: Block when queue is empty. 
296          @type blocking: bool 
297          @param timeout: The time (seconds) to block when empty. 
298          @return: Call result (call, retval) 
299          @rtype: tuple(2) 
300          """ 
301          return self.__tracking[1].get(blocking, timeout) 
 302   
303 -    def find(self, id=None, blocking=True, timeout=None): 
 304          """ 
305          Find the results of I{calls} executed in the pool by job ID. 
306          @param id: A job ID. 
307          @type id: str 
308          @param blocking: Block when queue is empty. 
309          @type blocking: bool 
310          @param timeout: The time (seconds) to block when empty. 
311          @return: Call return value 
312          @rtype: object 
313          """ 
314          return self.__tracking[1].find(id, blocking, timeout) 
 315   
317          """ 
318          Used by worker to get the next pending call. 
319          @return: The next pending call. 
320          @rtype L{Call} 
321          """ 
322          return self.__pending.get() 
 323   
325          """ 
326          Result received. 
327          @param call: A call object. 
328          @type call: Call 
329          """ 
330          if self.duplex: 
331              self.__tracking[1].put(call) 
332          self.__lock() 
333          try: 
334              self.__tracking[0].pop(call.id) 
335          finally: 
336              self.__unlock() 
 337   
339          """ 
340          Get pool statistics. 
341          @return: pool statistics 
342              - capacity: number of allocated threads 
343              - running: number of calls currently queued. 
344              - completed: number of calls completed but return has not been 
345                           consumed using get() or find(). 
346          @rtype: dict 
347          """ 
348          pending = self.__pending.qsize() 
349          self.__lock() 
350          try: 
351              return dict( 
352                  capacity=len(self), 
353                  pending=pending, 
354                  running=len(self.__tracking[0]), 
355                  completed=len(self.__tracking[1]) 
356              ) 
357          finally: 
358              self.__unlock() 
 359   
361          """ 
362          Shutdown the pool. 
363          Terminate and join all workers. 
364          """ 
365           
366          for n in range(0, len(self)): 
367              self.__pending.put(0) 
368          self.__lock() 
369           
370          try: 
371              threads = self.__threads[:] 
372              self.__threads = [] 
373          finally: 
374              self.__unlock() 
375           
376          for t in threads: 
377              t.join() 
 378   
379      @synchronized 
381          """ 
382          Call has been scheduled. 
383          @param call: A call (id, fn, args, kwargs). 
384          @type call: tuple(4) 
385          """ 
386          self.__tracking[0][call.id] = call 
 387   
388      @synchronized 
390          """ 
391          Add a thread to the pool. 
392          """ 
393          total = len(self.__threads) 
394          if total < self.max: 
395              thread = Worker(total, self) 
396              self.__threads.append(thread) 
397              thread.start() 
 398   
399      @synchronized 
401          """ 
402          Expand the worker pool based on needed capacity. 
403          """ 
404          total = len(self.__threads) 
405          queued = len(self.__tracking[0]) 
406          capacity = (total-queued) 
407          if capacity < 1: 
408              self.__add() 
 409       
412   
415   
416      @synchronized 
418          return len(self.__threads) 
 419   
421          return 'pool: %s' % self.info() 
  422   
441