1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Thread Pool classes.
18 """
19
20 from uuid import uuid4
21 from threading import Thread, RLock, Condition
22 from Queue import Queue, Empty
23 from logging import getLogger
24
25 from gofer import synchronized, conditional
26
27
28 log = getLogger(__name__)
32 """
33 Pool (worker) thread.
34 @ivar pool: A thread pool.
35 @type pool: L{ThreadPool}
36 """
37
39 """
40 @param id: The worker id in the pool.
41 @type id: int
42 @param pool: A thread pool.
43 @type pool: L{ThreadPool}
44 """
45 name = 'worker-%d' % id
46 Thread.__init__(self, name=name)
47 self.id = id
48 self.pool = pool
49 self.setDaemon(True)
50
52 """
53 Main run loop; processes input queue.
54 """
55 while True:
56 call = self.pool.pending()
57 if not call:
58
59 self.pool = None
60 return
61 try:
62 call(self.pool)
63 except Exception:
64 log.exception(str(call))
65
68 """
69 A call to be executed by the thread pool.
70 @ivar id: The unique call ID.
71 @type id: str
72 @ivar fn: The function/method to be executed.
73 @type fn: callable
74 @ivar args: The list of args passed to the callable.
75 @type args: list
76 @ivar kwargs: The list of keyword args passed to the callable.
77 @type kwargs: dict
78 """
79
80 - def __init__(self, id, fn, args=None, kwargs=None):
81 """
82 @param id: The unique call ID.
83 @type id: str
84 @param fn: The function/method to be executed.
85 @type fn: callable
86 @param args: The list of args passed to the callable.
87 @type args: tuple
88 @param kwargs: The list of keyword args passed to the callable.
89 @type kwargs: dict
90 """
91 self.id = id
92 self.fn = fn
93 self.args = args or []
94 self.kwargs = kwargs or {}
95 self.retval = None
96
98 """
99 Execute the call.
100 @param pool: A thread pool.
101 @type pool: L{ThreadPool}
102 """
103 try:
104 self.retval = self.fn(*self.args, **self.kwargs)
105 except Exception, e:
106 self.retval = e
107 pool.completed(self)
108
110 s = ['call: ']
111 s.append('%s = ' % self.retval)
112 s.append(str(self.fn))
113 s.append('(')
114 s.append(str(self.args))
115 s.append(', ')
116 s.append(str(self.kwargs))
117 s.append(')')
118 return ''.join(s)
119
122 """
123 Synchronized call queue with indexed search.
124 @ivar __condition: A condition used to synchronize the queue.
125 @type __condition: L{Condition}
126 @ivar __list: Provides fifo functionality.
127 @type __list: list
128 @ivar __dict: Provides indexed access.
129 @type __dict: dict
130 """
131
133 self.__condition = Condition()
134 self.__list = []
135 self.__dict = {}
136
137 @conditional
138 - def put(self, call):
139 """
140 Put a call and retval in the queue.
141 Signals threads waiting on the condition using get() or find().
142 @param call: A call to enqueue.
143 @type call: L{Call}
144 """
145 self.__list.insert(0, call)
146 self.__dict[call.id] = call
147 self.__notify()
148
149 @conditional
150 - def get(self, blocking=True, timeout=None):
151 """
152 Read the next available call.
153 @param blocking: Block and wait when the queue is empty.
154 @type blocking: bool
155 @param timeout: The time to wait when the queue is empty.
156 @type timeout: int
157 @return: The next completed call.
158 @rtype: L{call}
159 """
160 waited = False
161 while True:
162 if self.__list:
163 call = self.__list.pop()
164 self.__dict.pop(call.id)
165 return call
166 else:
167 if blocking:
168 if waited:
169 raise Empty()
170 self.__wait(timeout)
171 waited = True
172 else:
173 raise Empty()
174
175 @conditional
176 - def find(self, id, blocking=True, timeout=None):
177 """
178 Find a call result by ID.
179 @param id: A call ID.
180 @type id: str
181 @param blocking: Block and wait when the queue is empty.
182 @type blocking: bool
183 @param timeout: The time to wait when the queue is empty.
184 @type timeout: int
185 @return: A completed call by call ID.
186 @rtype: L{call}
187 """
188 waited = False
189 while True:
190 if self.__dict.has_key(id):
191 call = self.__dict.pop(id)
192 self.__list.remove(call)
193 return call
194 else:
195 if blocking:
196 if waited:
197 raise Empty()
198 self.__wait(timeout)
199 waited = True
200 else:
201 raise Empty()
202
204 self.__condition.notify_all()
205
208
211
214
215 @conditional
218
221 """
222 A load distributed thread pool.
223 @ivar min: The min # of workers.
224 @type min: int
225 @ivar max: The max # of workers.
226 @type max: int
227 @ivar __pending: The worker pending queue.
228 @type __pending: L{Queue}
229 @ivar __threads: The list of workers
230 @type __threads: list
231 @ivar __tracking: The job tracking dict(s) (<pending>,<completed>)
232 @type __tracking: tuple(2)
233 @ivar __mutex: The pool mutex.
234 @type __mutex: RLock
235 """
236
237 - def __init__(self, min=1, max=1, duplex=True):
238 """
239 @param min: The min # of workers.
240 @type min: int
241 @param max: The max # of workers.
242 @type max: int
243 @param duplex: Indicates that the pool supports
244 bidirectional communication. That is, call
245 results are queued. (default: True).
246 @type duplex: bool
247 """
248 assert(min > 0)
249 assert(max >= min)
250 self.min = min
251 self.max = max
252 self.duplex = duplex
253 self.__mutex = RLock()
254 self.__pending = Queue()
255 self.__threads = []
256 self.__tracking = ({}, IndexedQueue())
257 for x in range(0, min):
258 self.__add()
259
260 - def run(self, fn, *args, **kwargs):
261 """
262 Schedule a call.
263 Convenience method for scheduling.
264 @param fn: A function/method to execute.
265 @type fn: callable
266 @param args: The args passed to fn()
267 @type args: tuple
268 @param kwargs: The keyword args passed fn()
269 @type kwargs: dict
270 @return The call ID.
271 @rtype str
272 """
273 id = uuid4()
274 call = Call(id, fn, args, kwargs)
275 return self.schedule(call)
276
278 """
279 Schedule a call.
280 @param call: A call to schedule for execution.
281 @param call: L{Call}
282 @return: The call ID.
283 @rtype: str
284 """
285 self.__expand()
286 self.__track(call)
287 self.__pending.put(call)
288 return call.id
289
290 - def get(self, blocking=True, timeout=None):
291 """
292 Get the results of I{calls} executed in the pool.
293 @param id: A job ID.
294 @type id: str
295 @param blocking: Block when queue is empty.
296 @type blocking: bool
297 @param timeout: The time (seconds) to block when empty.
298 @return: Call result (call, retval)
299 @rtype: tuple(2)
300 """
301 return self.__tracking[1].get(blocking, timeout)
302
303 - def find(self, id=None, blocking=True, timeout=None):
304 """
305 Find the results of I{calls} executed in the pool by job ID.
306 @param id: A job ID.
307 @type id: str
308 @param blocking: Block when queue is empty.
309 @type blocking: bool
310 @param timeout: The time (seconds) to block when empty.
311 @return: Call return value
312 @rtype: object
313 """
314 return self.__tracking[1].find(id, blocking, timeout)
315
317 """
318 Used by worker to get the next pending call.
319 @return: The next pending call.
320 @rtype L{Call}
321 """
322 return self.__pending.get()
323
325 """
326 Result received.
327 @param call: A call object.
328 @type call: Call
329 """
330 if self.duplex:
331 self.__tracking[1].put(call)
332 self.__lock()
333 try:
334 self.__tracking[0].pop(call.id)
335 finally:
336 self.__unlock()
337
339 """
340 Get pool statistics.
341 @return: pool statistics
342 - capacity: number of allocated threads
343 - running: number of calls currently queued.
344 - completed: number of calls completed but return has not been
345 consumed using get() or find().
346 @rtype: dict
347 """
348 pending = self.__pending.qsize()
349 self.__lock()
350 try:
351 return dict(
352 capacity=len(self),
353 pending=pending,
354 running=len(self.__tracking[0]),
355 completed=len(self.__tracking[1])
356 )
357 finally:
358 self.__unlock()
359
361 """
362 Shutdown the pool.
363 Terminate and join all workers.
364 """
365
366 for n in range(0, len(self)):
367 self.__pending.put(0)
368 self.__lock()
369
370 try:
371 threads = self.__threads[:]
372 self.__threads = []
373 finally:
374 self.__unlock()
375
376 for t in threads:
377 t.join()
378
379 @synchronized
381 """
382 Call has been scheduled.
383 @param call: A call (id, fn, args, kwargs).
384 @type call: tuple(4)
385 """
386 self.__tracking[0][call.id] = call
387
388 @synchronized
390 """
391 Add a thread to the pool.
392 """
393 total = len(self.__threads)
394 if total < self.max:
395 thread = Worker(total, self)
396 self.__threads.append(thread)
397 thread.start()
398
399 @synchronized
401 """
402 Expand the worker pool based on needed capacity.
403 """
404 total = len(self.__threads)
405 queued = len(self.__tracking[0])
406 capacity = (total-queued)
407 if capacity < 1:
408 self.__add()
409
412
415
416 @synchronized
418 return len(self.__threads)
419
421 return 'pool: %s' % self.info()
422
441