1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 Provides async AMQP message consumer classes.
19 """
20
21 import os
22 from time import sleep, time
23 from threading import Thread
24 from gofer import NAME, Singleton
25 from gofer.messaging import *
26 from gofer.rmi.dispatcher import Reply, Return, RemoteException
27 from gofer.rmi.policy import RequestTimeout
28 from gofer.messaging.consumer import Consumer
29 from gofer.messaging.producer import Producer
30 from logging import getLogger
31
32 log = getLogger(__name__)
33
34
36 """
37 A request, reply consumer.
38 @ivar listener: An reply listener.
39 @type listener: any
40 @ivar watchdog: An (optional) watchdog.
41 @type watchdog: L{WatchDog}
42 @ivar blacklist: A set of serial numbers to ignore.
43 @type blacklist: set
44 """
45
46 - def start(self, listener, watchdog=None):
47 """
48 Start processing messages on the queue and
49 forward to the listener.
50 @param listener: A reply listener.
51 @type listener: L{Listener}
52 @param watchdog: An (optional) watchdog.
53 @type watchdog: L{WatchDog}
54 """
55 self.listener = listener
56 self.watchdog = watchdog or LazyDog()
57 self.blacklist = set()
58 Consumer.start(self)
59
61 """
62 Dispatch received request.
63 The serial number of failed requests is added to the blacklist
64 help prevent dispatching both failure and success replies. The
65 primary cause of this is when the watchdog has replied on the agent's
66 behalf but the agent actually completes the request and later sends
67 a reply.
68 @param envelope: The received envelope.
69 @type envelope: L{Envelope}
70 """
71 try:
72 reply = Reply(envelope)
73 if envelope.sn in self.blacklist:
74
75 return
76 if reply.started():
77 self.watchdog.started(envelope.sn)
78 reply = Started(envelope)
79 reply.notify(self.listener)
80 return
81 if reply.progress():
82 self.watchdog.progress(envelope.sn)
83 reply = Progress(envelope)
84 reply.notify(self.listener)
85 return
86 if reply.succeeded():
87 self.blacklist.add(envelope.sn)
88 self.watchdog.completed(envelope.sn)
89 reply = Succeeded(envelope)
90 reply.notify(self.listener)
91 return
92 if reply.failed():
93 self.blacklist.add(envelope.sn)
94 self.watchdog.completed(envelope.sn)
95 reply = Failed(envelope)
96 reply.notify(self.listener)
97 return
98 except Exception:
99 log.exception(envelope)
100
101
103 """
104 Asynchronous request reply.
105 @ivar sn: The request serial number.
106 @type sn: str
107 @ivar origin: Which endpoint sent the reply.
108 @type origin: str
109 @ivar any: User defined (round-tripped) data.
110 @type any: object
111 """
112
114 """
115 @param envelope: The received envelope.
116 @type envelope: L{Envelope}
117 """
118 self.sn = envelope.sn
119 self.origin = envelope.routing[0]
120 self.any = envelope.any
121
123 """
124 Notify the specified listener.
125 @param listener: The listener to notify.
126 @type listener: L{Listener} or callable.
127 """
128 pass
129
131 s = []
132 s.append(self.__class__.__name__)
133 s.append(' sn : %s' % self.sn)
134 s.append(' origin : %s' % self.origin)
135 s.append(' user data : %s' % self.any)
136 return '\n'.join(s)
137
138
140 """
141 A (final) reply.
142 """
143
145 if callable(listener):
146 listener(self)
147 return
148 if self.succeeded():
149 listener.succeeded(self)
150 else:
151 listener.failed(self)
152
154 """
155 Get whether the reply indicates success.
156 @return: True when succeeded.
157 @rtype: bool
158 """
159 return False
160
162 """
163 Get whether the reply indicates failure.
164 @return: True when failed.
165 @rtype: bool
166 """
167 return ( not self.succeeded() )
168
170 """
171 Throw contained exception.
172 @raise Exception: When contained.
173 """
174 pass
175
176
178 """
179 Successful reply to asynchronous operation.
180 @ivar retval: The returned value.
181 @type retval: object
182 """
183
185 """
186 @param envelope: The received envelope.
187 @type envelope: L{Envelope}
188 """
189 AsyncReply.__init__(self, envelope)
190 reply = Return(envelope.result)
191 self.retval = reply.retval
192
195
197 s = []
198 s.append(AsyncReply.__str__(self))
199 s.append(' retval:')
200 s.append(str(self.retval))
201 return '\n'.join(s)
202
203
205 """
206 Failed reply to asynchronous operation. This reply
207 indicates an exception was raised.
208 @ivar exval: The returned exception.
209 @type exval: object
210 @see: L{Failed.throw}
211 """
212
214 """
215 @param envelope: The received envelope.
216 @type envelope: L{Envelope}
217 """
218 AsyncReply.__init__(self, envelope)
219 reply = Return(envelope.result)
220 self.exval = RemoteException.instance(reply)
221 self.xmodule = reply.xmodule,
222 self.xclass = reply.xclass
223 self.xstate = reply.xstate
224 self.xargs = reply.xargs
225
228
230 s = []
231 s.append(AsyncReply.__str__(self))
232 s.append(' exval: %s' % str(self.exval))
233 s.append(' xmodule: %s' % self.xmodule)
234 s.append(' xclass: %s' % self.xclass)
235 s.append(' xstate: %s' % self.xstate)
236 s.append(' xargs: %s' % self.xargs)
237 return '\n'.join(s)
238
239
241 """
242 An asynchronous operation started.
243 @see: L{Failed.throw}
244 """
245
247 if callable(listener):
248 listener(self)
249 else:
250 listener.started(self)
251
257
258
260 """
261 Progress reported for an asynchronous operation.
262 @ivar total: The total number of units.
263 @type total: int
264 @ivar completed: The total number of completed units.
265 @type completed: int
266 @ivar details: Optional information about the progress.
267 @type details: object
268 @see: L{Failed.throw}
269 """
270
272 """
273 @param envelope: The received envelope.
274 @type envelope: L{Envelope}
275 """
276 AsyncReply.__init__(self, envelope)
277 self.total = envelope.total
278 self.completed = envelope.completed
279 self.details = envelope.details
280
282 if callable(listener):
283 listener(self)
284 else:
285 listener.progress(self)
286
288 s = []
289 s.append(AsyncReply.__str__(self))
290 s.append(' total: %s' % str(self.total))
291 s.append(' completed: %s' % str(self.completed))
292 s.append(' details: %s' % str(self.details))
293 return '\n'.join(s)
294
295
297 """
298 An asynchronous operation callback listener.
299 """
300
302 """
303 Async request succeeded.
304 @param reply: The reply data.
305 @type reply: L{Succeeded}.
306 """
307 pass
308
310 """
311 Async request failed (raised an exception).
312 @param reply: The reply data.
313 @type reply: L{Failed}.
314 """
315 pass
316
318 """
319 Async request has started.
320 @param reply: The request.
321 @type reply: L{Started}.
322 """
323 pass
324
326 """
327 Async progress report.
328 @param reply: The request.
329 @type reply: L{Progress}.
330 """
331 pass
332
333
335 """
336 A watchdog object used to track asynchronous messages
337 by serial number. Tracking is persisted using journal files.
338 @ivar url: The AMQP broker URL.
339 @type url: str
340 @ivar __jnl: A journal use for persistence.
341 @type __jnl: L{Journal}
342 @ivar __producer: An AMQP message producer.
343 @type __producer: L{Producer}
344 @ivar __run: Run flag.
345 @type __run: bool
346 """
347
348 __metaclass__ = Singleton
349
350 URL = Producer.LOCALHOST
351
353 """
354 @param url: The (optional) broker URL.
355 @type url: str
356 @param journal: A journal object (default: Journal()).
357 @type journal: L{Journal}
358 """
359 self.url = url
360 self.__producer = None
361 self.__jnl = (journal or Journal())
362
364 """
365 Start a watchdog thread.
366 @return: The started thread.
367 @rtype: L{WatchDogThread}
368 """
369 thread = WatchDogThread(self)
370 thread.start()
371 return thread
372
373 - def track(self, sn, replyto, any, timeout):
374 """
375 Add a request by serial number for tacking.
376 @param sn: A serial number.
377 @type sn: str
378 @param replyto: An AMQP address.
379 @type replyto: str
380 @param any: User defined data.
381 @type any: any
382 @param timeout: A timeout (start,complete)
383 @type timeout: tuple(2)
384 """
385 now = time()
386 ts = (now+timeout[0], now+timeout[1])
387 je = self.__jnl.write(sn, replyto, any, ts)
388 log.info('tracking: %s', je)
389
391 """
392 Timeout is a tuple of: (start,complete).
393 A proper status='started' has been received and the timout
394 index is changed from 0 to 1. This switches the timeout logic
395 to work off the 2nd timeout which indicates the completion timeout.
396 @param sn: An entry serial number.
397 @type sn: str
398 """
399 log.info(sn)
400 je = self.__jnl.find(sn)
401 if je:
402 self.__jnl.update(sn, idx=1)
403 else:
404 pass
405
407 """
408 Progress reporting received.
409 Because a progress report has been received, the
410 current timestamp is bumped 5 seconds only if the timestamp
411 is within 5 seconds of expiration.
412 """
413 log.info(sn)
414 je = self.__jnl.find(sn)
415 if not je:
416
417 return
418 if je.idx != 1:
419
420 return
421 grace_period = time()+5
422 if grace_period > je.ts[1]:
423 ts = (je.ts[0], grace_period)
424 self.__jnl.update(sn, ts=ts)
425
427 """
428 The request has been properly completed by the agent.
429 Tracking is discontinued.
430 @param sn: An entry serial number.
431 @type sn: str
432 """
433 log.info(sn)
434 self.__jnl.delete(sn)
435
437 """
438 Process all I{outstanding} journal entries.
439 When a journal entry (timeout) is detected, a L{RequestTimeout}
440 exception is raised and sent to the I{replyto} AMQP address.
441 The journal entry is deleted.
442 """
443 sent = []
444 now = time()
445 if self.__producer is None:
446 self.__producer = Producer(url=self.url)
447 for sn,je in self.__jnl.load().items():
448 if now > je.ts[je.idx]:
449 sent.append(sn)
450 try:
451 raise RequestTimeout(sn, je.idx)
452 except:
453 self.__overdue(je)
454 for sn in sent:
455 self.__jnl.delete(sn)
456
458 """
459 Send the (timeout) reply to the I{replyto} AMQP address
460 specified in the journal entry.
461 @param je: A journal entry.
462 @type je: Entry
463 """
464 log.info('sn:%s timeout detected', je.sn)
465 try:
466 self.__sendreply(je)
467 except:
468 log.exception(str(je))
469
471 """
472 Send the (timeout) reply to the I{replyto} AMQP address
473 specified in the journal entry.
474 @param je: A journal entry.
475 @type je: Entry
476 """
477 sn = je.sn
478 replyto = je.replyto
479 any = je.any
480 result = Return.exception()
481 log.info('send (timeout) for sn:%s to:%s', sn, replyto)
482 self.__producer.send(
483 replyto,
484 sn=sn,
485 any=any,
486 result=result,
487 watchdog=self.__producer.uuid)
488
489
491 """
492 Watchdog thread.
493 """
494
500
510
512 self.__run = False
513 return self
514
515
517 """
518 Async message journal
519 @ivar root: The root journal directory.
520 @type root: str
521 @cvar ROOT: The default journal directory root.
522 @type ROOT: str
523 Entry:
524 - sn: serial number
525 - replyto: reply to amqp address.
526 - any: user data
527 - timeout: (start<ctime>, complete<ctime>)
528 - idx: current timeout index.
529 """
530
531 ROOT = '/tmp/%s/journal/watchdog' % NAME
532
534 """
535 @param root: A journal root directory path.
536 @type root: str
537 """
538 self.root = root
539 self.__mkdir()
540
542 """
543 Load all journal entries.
544 @return: A dict of journal entries.
545 @rtype: dict
546 """
547 entries = {}
548 for fn in os.listdir(self.root):
549 path = os.path.join(self.root, fn)
550 if os.path.isdir(path):
551 continue
552 je = self.__read(path)
553 if not je:
554 continue
555 entries[je.sn] = je
556 return entries
557
558 - def write(self, sn, replyto, any, ts):
559 """
560 Write a new journal entry.
561 @param sn: A serial number.
562 @type sn: str
563 @param replyto: An AMQP address.
564 @type replyto: str
565 @param any: User defined data.
566 @type any: any
567 @param ts: A timeout (start<ctime>,complete<ctime>)
568 @type ts: tuple(2)
569 """
570 je = Envelope(
571 sn=sn,
572 replyto=replyto,
573 any=any,
574 ts=ts,
575 idx=0)
576 self.__write(je)
577 return je
578
579 - def update(self, sn, **property):
580 """
581 Update a journal entry for the specified I{sn}.
582 @param sn: An entry serial number.
583 @type sn: str
584 @param property: properties to update.
585 @type property: dict
586 @return: The updated journal entry
587 @rtype: Entry
588 @raise KeyError: On invalid key.
589 """
590 je = self.find(sn)
591 if not je:
592 return None
593 for k,v in property.items():
594 if k in ('sn',):
595 continue
596 if k in je:
597 je[k] = v
598 else:
599 raise KeyError(k)
600 self.__write(je)
601 return je
602
604 """
605 Delete a journal entry by serial number.
606 @param sn: An entry serial number.
607 @type sn: str
608 """
609 fn = self.__fn(sn)
610 path = os.path.join(self.root, fn)
611 self.__unlink(path)
612
613 - def find(self, sn):
614 """
615 Find a journal entry my serial number.
616 @param sn: An entry serial number.
617 @type sn: str
618 @return: The journal entry.
619 @rtype: Entry
620 """
621 try:
622 fn = self.__fn(sn)
623 path = os.path.join(self.root, fn)
624 return self.__read(path)
625 except (IOError, OSError):
626 log.debug(sn, exc_info=1)
627
628 - def __fn(self, sn):
629 """
630 File name.
631 @param sn: An entry serial number.
632 @type sn: str
633 @return: The journal file name by serial number.
634 @rtype: str
635 """
636 return '%s.jnl' % sn
637
639 """
640 Read the journal file at the specified I{path}.
641 @param path: A journal file path.
642 @type path: str
643 @return: A journal entry.
644 @rtype: Entry
645 """
646 f = open(path)
647 try:
648 try:
649 je = Envelope()
650 je.load(f.read())
651 return je
652 except:
653 log.exception(path)
654 self.__unlink(path)
655 finally:
656 f.close()
657
659 """
660 Write the specified journal entry.
661 @param je: A journal entry
662 @type je: Entry
663 """
664 path = os.path.join(self.root, self.__fn(je.sn))
665 f = open(path, 'w')
666 try:
667 f.write(je.dump())
668 finally:
669 f.close
670
672 """
673 Unlink (delete) the journal file at the specified I{path}.
674 @param path: A journal file path.
675 @type path: str
676 """
677 try:
678 os.unlink(path)
679 except OSError:
680 log.debug(path, exc_info=1)
681
683 """
684 Ensure the directory exists.
685 """
686 if not os.path.exists(self.root):
687 os.makedirs(self.root)
688
689
691 """
692 A lazy (good-for-nothing) watchdog.
693 Basically a watchdog that does not do anything.
694 """
695
696 - def track(self, sn, replyto, any, timeout):
698
701
704
707