Package gofer :: Package rmi :: Module async
[hide private]
[frames] | no frames]

Source Code for Module gofer.rmi.async

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16   
 17  """ 
 18  Provides async AMQP message consumer classes. 
 19  """ 
 20   
 21  import os 
 22  from time import sleep, time 
 23  from threading import Thread 
 24  from gofer import NAME, Singleton 
 25  from gofer.messaging import * 
 26  from gofer.rmi.dispatcher import Reply, Return, RemoteException 
 27  from gofer.rmi.policy import RequestTimeout 
 28  from gofer.messaging.consumer import Consumer 
 29  from gofer.messaging.producer import Producer 
 30  from logging import getLogger 
 31   
 32  log = getLogger(__name__) 
 33   
 34   
35 -class ReplyConsumer(Consumer):
36 """ 37 A request, reply consumer. 38 @ivar listener: An reply listener. 39 @type listener: any 40 @ivar watchdog: An (optional) watchdog. 41 @type watchdog: L{WatchDog} 42 @ivar blacklist: A set of serial numbers to ignore. 43 @type blacklist: set 44 """ 45
46 - def start(self, listener, watchdog=None):
47 """ 48 Start processing messages on the queue and 49 forward to the listener. 50 @param listener: A reply listener. 51 @type listener: L{Listener} 52 @param watchdog: An (optional) watchdog. 53 @type watchdog: L{WatchDog} 54 """ 55 self.listener = listener 56 self.watchdog = watchdog or LazyDog() 57 self.blacklist = set() 58 Consumer.start(self)
59
60 - def dispatch(self, envelope):
61 """ 62 Dispatch received request. 63 The serial number of failed requests is added to the blacklist 64 help prevent dispatching both failure and success replies. The 65 primary cause of this is when the watchdog has replied on the agent's 66 behalf but the agent actually completes the request and later sends 67 a reply. 68 @param envelope: The received envelope. 69 @type envelope: L{Envelope} 70 """ 71 try: 72 reply = Reply(envelope) 73 if envelope.sn in self.blacklist: 74 # ignored 75 return 76 if reply.started(): 77 self.watchdog.started(envelope.sn) 78 reply = Started(envelope) 79 reply.notify(self.listener) 80 return 81 if reply.progress(): 82 self.watchdog.progress(envelope.sn) 83 reply = Progress(envelope) 84 reply.notify(self.listener) 85 return 86 if reply.succeeded(): 87 self.blacklist.add(envelope.sn) 88 self.watchdog.completed(envelope.sn) 89 reply = Succeeded(envelope) 90 reply.notify(self.listener) 91 return 92 if reply.failed(): 93 self.blacklist.add(envelope.sn) 94 self.watchdog.completed(envelope.sn) 95 reply = Failed(envelope) 96 reply.notify(self.listener) 97 return 98 except Exception: 99 log.exception(envelope)
100 101
102 -class AsyncReply:
103 """ 104 Asynchronous request reply. 105 @ivar sn: The request serial number. 106 @type sn: str 107 @ivar origin: Which endpoint sent the reply. 108 @type origin: str 109 @ivar any: User defined (round-tripped) data. 110 @type any: object 111 """ 112
113 - def __init__(self, envelope):
114 """ 115 @param envelope: The received envelope. 116 @type envelope: L{Envelope} 117 """ 118 self.sn = envelope.sn 119 self.origin = envelope.routing[0] 120 self.any = envelope.any
121
122 - def notify(self, listener):
123 """ 124 Notify the specified listener. 125 @param listener: The listener to notify. 126 @type listener: L{Listener} or callable. 127 """ 128 pass
129
130 - def __str__(self):
131 s = [] 132 s.append(self.__class__.__name__) 133 s.append(' sn : %s' % self.sn) 134 s.append(' origin : %s' % self.origin) 135 s.append(' user data : %s' % self.any) 136 return '\n'.join(s)
137 138
139 -class FinalReply(AsyncReply):
140 """ 141 A (final) reply. 142 """ 143
144 - def notify(self, listener):
145 if callable(listener): 146 listener(self) 147 return 148 if self.succeeded(): 149 listener.succeeded(self) 150 else: 151 listener.failed(self)
152
153 - def succeeded(self):
154 """ 155 Get whether the reply indicates success. 156 @return: True when succeeded. 157 @rtype: bool 158 """ 159 return False
160
161 - def failed(self):
162 """ 163 Get whether the reply indicates failure. 164 @return: True when failed. 165 @rtype: bool 166 """ 167 return ( not self.succeeded() )
168
169 - def throw(self):
170 """ 171 Throw contained exception. 172 @raise Exception: When contained. 173 """ 174 pass
175 176
177 -class Succeeded(FinalReply):
178 """ 179 Successful reply to asynchronous operation. 180 @ivar retval: The returned value. 181 @type retval: object 182 """ 183
184 - def __init__(self, envelope):
185 """ 186 @param envelope: The received envelope. 187 @type envelope: L{Envelope} 188 """ 189 AsyncReply.__init__(self, envelope) 190 reply = Return(envelope.result) 191 self.retval = reply.retval
192
193 - def succeeded(self):
194 return True
195
196 - def __str__(self):
197 s = [] 198 s.append(AsyncReply.__str__(self)) 199 s.append(' retval:') 200 s.append(str(self.retval)) 201 return '\n'.join(s)
202 203
204 -class Failed(FinalReply):
205 """ 206 Failed reply to asynchronous operation. This reply 207 indicates an exception was raised. 208 @ivar exval: The returned exception. 209 @type exval: object 210 @see: L{Failed.throw} 211 """ 212
213 - def __init__(self, envelope):
214 """ 215 @param envelope: The received envelope. 216 @type envelope: L{Envelope} 217 """ 218 AsyncReply.__init__(self, envelope) 219 reply = Return(envelope.result) 220 self.exval = RemoteException.instance(reply) 221 self.xmodule = reply.xmodule, 222 self.xclass = reply.xclass 223 self.xstate = reply.xstate 224 self.xargs = reply.xargs
225
226 - def throw(self):
227 raise self.exval
228
229 - def __str__(self):
230 s = [] 231 s.append(AsyncReply.__str__(self)) 232 s.append(' exval: %s' % str(self.exval)) 233 s.append(' xmodule: %s' % self.xmodule) 234 s.append(' xclass: %s' % self.xclass) 235 s.append(' xstate: %s' % self.xstate) 236 s.append(' xargs: %s' % self.xargs) 237 return '\n'.join(s)
238 239
240 -class Started(AsyncReply):
241 """ 242 An asynchronous operation started. 243 @see: L{Failed.throw} 244 """ 245
246 - def notify(self, listener):
247 if callable(listener): 248 listener(self) 249 else: 250 listener.started(self)
251
252 - def __str__(self):
253 s = [] 254 s.append(AsyncReply.__str__(self)) 255 s.append('started') 256 return '\n'.join(s)
257 258
259 -class Progress(AsyncReply):
260 """ 261 Progress reported for an asynchronous operation. 262 @ivar total: The total number of units. 263 @type total: int 264 @ivar completed: The total number of completed units. 265 @type completed: int 266 @ivar details: Optional information about the progress. 267 @type details: object 268 @see: L{Failed.throw} 269 """ 270
271 - def __init__(self, envelope):
272 """ 273 @param envelope: The received envelope. 274 @type envelope: L{Envelope} 275 """ 276 AsyncReply.__init__(self, envelope) 277 self.total = envelope.total 278 self.completed = envelope.completed 279 self.details = envelope.details
280
281 - def notify(self, listener):
282 if callable(listener): 283 listener(self) 284 else: 285 listener.progress(self)
286
287 - def __str__(self):
288 s = [] 289 s.append(AsyncReply.__str__(self)) 290 s.append(' total: %s' % str(self.total)) 291 s.append(' completed: %s' % str(self.completed)) 292 s.append(' details: %s' % str(self.details)) 293 return '\n'.join(s)
294 295
296 -class Listener:
297 """ 298 An asynchronous operation callback listener. 299 """ 300
301 - def succeeded(self, reply):
302 """ 303 Async request succeeded. 304 @param reply: The reply data. 305 @type reply: L{Succeeded}. 306 """ 307 pass
308
309 - def failed(self, reply):
310 """ 311 Async request failed (raised an exception). 312 @param reply: The reply data. 313 @type reply: L{Failed}. 314 """ 315 pass
316
317 - def started(self, reply):
318 """ 319 Async request has started. 320 @param reply: The request. 321 @type reply: L{Started}. 322 """ 323 pass
324
325 - def progress(self, reply):
326 """ 327 Async progress report. 328 @param reply: The request. 329 @type reply: L{Progress}. 330 """ 331 pass
332 333
334 -class WatchDog:
335 """ 336 A watchdog object used to track asynchronous messages 337 by serial number. Tracking is persisted using journal files. 338 @ivar url: The AMQP broker URL. 339 @type url: str 340 @ivar __jnl: A journal use for persistence. 341 @type __jnl: L{Journal} 342 @ivar __producer: An AMQP message producer. 343 @type __producer: L{Producer} 344 @ivar __run: Run flag. 345 @type __run: bool 346 """ 347 348 __metaclass__ = Singleton 349 350 URL = Producer.LOCALHOST 351
352 - def __init__(self, url=URL, journal=None):
353 """ 354 @param url: The (optional) broker URL. 355 @type url: str 356 @param journal: A journal object (default: Journal()). 357 @type journal: L{Journal} 358 """ 359 self.url = url 360 self.__producer = None 361 self.__jnl = (journal or Journal())
362
363 - def start(self):
364 """ 365 Start a watchdog thread. 366 @return: The started thread. 367 @rtype: L{WatchDogThread} 368 """ 369 thread = WatchDogThread(self) 370 thread.start() 371 return thread
372
373 - def track(self, sn, replyto, any, timeout):
374 """ 375 Add a request by serial number for tacking. 376 @param sn: A serial number. 377 @type sn: str 378 @param replyto: An AMQP address. 379 @type replyto: str 380 @param any: User defined data. 381 @type any: any 382 @param timeout: A timeout (start,complete) 383 @type timeout: tuple(2) 384 """ 385 now = time() 386 ts = (now+timeout[0], now+timeout[1]) 387 je = self.__jnl.write(sn, replyto, any, ts) 388 log.info('tracking: %s', je)
389
390 - def started(self, sn):
391 """ 392 Timeout is a tuple of: (start,complete). 393 A proper status='started' has been received and the timout 394 index is changed from 0 to 1. This switches the timeout logic 395 to work off the 2nd timeout which indicates the completion timeout. 396 @param sn: An entry serial number. 397 @type sn: str 398 """ 399 log.info(sn) 400 je = self.__jnl.find(sn) 401 if je: 402 self.__jnl.update(sn, idx=1) 403 else: 404 pass # ignored
405
406 - def progress(self, sn):
407 """ 408 Progress reporting received. 409 Because a progress report has been received, the 410 current timestamp is bumped 5 seconds only if the timestamp 411 is within 5 seconds of expiration. 412 """ 413 log.info(sn) 414 je = self.__jnl.find(sn) 415 if not je: 416 # ignored 417 return 418 if je.idx != 1: 419 # invalid state 420 return 421 grace_period = time()+5 # seconds 422 if grace_period > je.ts[1]: 423 ts = (je.ts[0], grace_period) 424 self.__jnl.update(sn, ts=ts)
425
426 - def completed(self, sn):
427 """ 428 The request has been properly completed by the agent. 429 Tracking is discontinued. 430 @param sn: An entry serial number. 431 @type sn: str 432 """ 433 log.info(sn) 434 self.__jnl.delete(sn)
435
436 - def process(self):
437 """ 438 Process all I{outstanding} journal entries. 439 When a journal entry (timeout) is detected, a L{RequestTimeout} 440 exception is raised and sent to the I{replyto} AMQP address. 441 The journal entry is deleted. 442 """ 443 sent = [] 444 now = time() 445 if self.__producer is None: 446 self.__producer = Producer(url=self.url) 447 for sn,je in self.__jnl.load().items(): 448 if now > je.ts[je.idx]: 449 sent.append(sn) 450 try: 451 raise RequestTimeout(sn, je.idx) 452 except: 453 self.__overdue(je) 454 for sn in sent: 455 self.__jnl.delete(sn)
456
457 - def __overdue(self, je):
458 """ 459 Send the (timeout) reply to the I{replyto} AMQP address 460 specified in the journal entry. 461 @param je: A journal entry. 462 @type je: Entry 463 """ 464 log.info('sn:%s timeout detected', je.sn) 465 try: 466 self.__sendreply(je) 467 except: 468 log.exception(str(je))
469
470 - def __sendreply(self, je):
471 """ 472 Send the (timeout) reply to the I{replyto} AMQP address 473 specified in the journal entry. 474 @param je: A journal entry. 475 @type je: Entry 476 """ 477 sn = je.sn 478 replyto = je.replyto 479 any = je.any 480 result = Return.exception() 481 log.info('send (timeout) for sn:%s to:%s', sn, replyto) 482 self.__producer.send( 483 replyto, 484 sn=sn, 485 any=any, 486 result=result, 487 watchdog=self.__producer.uuid)
488 489
490 -class WatchDogThread(Thread):
491 """ 492 Watchdog thread. 493 """ 494
495 - def __init__(self, watchdog):
496 Thread.__init__(self, name='watchdog') 497 self.watchdog = watchdog 498 self.__run = True 499 self.setDaemon(True)
500
501 - def run(self):
502 watchdog = self.watchdog 503 while self.__run: 504 try: 505 watchdog.process() 506 sleep(1) 507 except: 508 log.exception(self.getName()) 509 sleep(3)
510
511 - def stop(self):
512 self.__run = False 513 return self
514 515
516 -class Journal:
517 """ 518 Async message journal 519 @ivar root: The root journal directory. 520 @type root: str 521 @cvar ROOT: The default journal directory root. 522 @type ROOT: str 523 Entry: 524 - sn: serial number 525 - replyto: reply to amqp address. 526 - any: user data 527 - timeout: (start<ctime>, complete<ctime>) 528 - idx: current timeout index. 529 """ 530 531 ROOT = '/tmp/%s/journal/watchdog' % NAME 532
533 - def __init__(self, root=ROOT):
534 """ 535 @param root: A journal root directory path. 536 @type root: str 537 """ 538 self.root = root 539 self.__mkdir()
540
541 - def load(self):
542 """ 543 Load all journal entries. 544 @return: A dict of journal entries. 545 @rtype: dict 546 """ 547 entries = {} 548 for fn in os.listdir(self.root): 549 path = os.path.join(self.root, fn) 550 if os.path.isdir(path): 551 continue 552 je = self.__read(path) 553 if not je: 554 continue 555 entries[je.sn] = je 556 return entries
557
558 - def write(self, sn, replyto, any, ts):
559 """ 560 Write a new journal entry. 561 @param sn: A serial number. 562 @type sn: str 563 @param replyto: An AMQP address. 564 @type replyto: str 565 @param any: User defined data. 566 @type any: any 567 @param ts: A timeout (start<ctime>,complete<ctime>) 568 @type ts: tuple(2) 569 """ 570 je = Envelope( 571 sn=sn, 572 replyto=replyto, 573 any=any, 574 ts=ts, 575 idx=0) 576 self.__write(je) 577 return je
578
579 - def update(self, sn, **property):
580 """ 581 Update a journal entry for the specified I{sn}. 582 @param sn: An entry serial number. 583 @type sn: str 584 @param property: properties to update. 585 @type property: dict 586 @return: The updated journal entry 587 @rtype: Entry 588 @raise KeyError: On invalid key. 589 """ 590 je = self.find(sn) 591 if not je: 592 return None 593 for k,v in property.items(): 594 if k in ('sn',): 595 continue 596 if k in je: 597 je[k] = v 598 else: 599 raise KeyError(k) 600 self.__write(je) 601 return je
602
603 - def delete(self, sn):
604 """ 605 Delete a journal entry by serial number. 606 @param sn: An entry serial number. 607 @type sn: str 608 """ 609 fn = self.__fn(sn) 610 path = os.path.join(self.root, fn) 611 self.__unlink(path)
612
613 - def find(self, sn):
614 """ 615 Find a journal entry my serial number. 616 @param sn: An entry serial number. 617 @type sn: str 618 @return: The journal entry. 619 @rtype: Entry 620 """ 621 try: 622 fn = self.__fn(sn) 623 path = os.path.join(self.root, fn) 624 return self.__read(path) 625 except (IOError, OSError): 626 log.debug(sn, exc_info=1)
627
628 - def __fn(self, sn):
629 """ 630 File name. 631 @param sn: An entry serial number. 632 @type sn: str 633 @return: The journal file name by serial number. 634 @rtype: str 635 """ 636 return '%s.jnl' % sn
637
638 - def __read(self, path):
639 """ 640 Read the journal file at the specified I{path}. 641 @param path: A journal file path. 642 @type path: str 643 @return: A journal entry. 644 @rtype: Entry 645 """ 646 f = open(path) 647 try: 648 try: 649 je = Envelope() 650 je.load(f.read()) 651 return je 652 except: 653 log.exception(path) 654 self.__unlink(path) 655 finally: 656 f.close()
657
658 - def __write(self, je):
659 """ 660 Write the specified journal entry. 661 @param je: A journal entry 662 @type je: Entry 663 """ 664 path = os.path.join(self.root, self.__fn(je.sn)) 665 f = open(path, 'w') 666 try: 667 f.write(je.dump()) 668 finally: 669 f.close
670 681
682 - def __mkdir(self):
683 """ 684 Ensure the directory exists. 685 """ 686 if not os.path.exists(self.root): 687 os.makedirs(self.root)
688 689
690 -class LazyDog:
691 """ 692 A lazy (good-for-nothing) watchdog. 693 Basically a watchdog that does not do anything. 694 """ 695
696 - def track(self, sn, replyto, any, timeout):
697 pass
698
699 - def started(self, sn):
700 pass
701
702 - def progress(self, sn):
703 pass
704
705 - def completed(self, sn):
706 pass
707