Package gofer :: Package rmi :: Module policy
[hide private]
[frames] | no frames]

Source Code for Module gofer.rmi.policy

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16  """ 
 17  Contains request delivery policies. 
 18  """ 
 19   
 20  from gofer.messaging import * 
 21  from gofer.rmi.dispatcher import * 
 22  from gofer.metrics import Timer 
 23  from gofer.messaging.consumer import Reader 
 24  from logging import getLogger 
 25   
 26  log = getLogger(__name__) 
27 28 # 29 # Utils 30 # 31 32 -def timeout(options, none=(None,None)):
33 """ 34 Extract (and default as necessary) the timeout option. 35 @param options: Policy options. 36 @type options: dict 37 @return: The timeout (<start>,<duration>) 38 @rtype: tuple 39 """ 40 tm = options.timeout 41 if tm is None: 42 return none 43 if isinstance(tm, (list,tuple)): 44 timeout = Timeout(*tm) 45 else: 46 timeout = Timeout(tm, tm) 47 return timeout.tuple()
48
49 50 -class Timeout:
51 """ 52 Policy timeout. 53 @cvar MINUTE: Minutes in seconds. 54 @cvar HOUR: Hour is seconds 55 @cvar DAY: Day in seconds 56 @cvar SUFFIX: Suffix to multiplier mapping. 57 """ 58 59 SECOND = 1 60 MINUTE = 60 61 HOUR = (MINUTE * 60) 62 DAY = (HOUR * 24) 63 64 SUFFIX = { 65 's' : SECOND, 66 'm' : MINUTE, 67 'h' : HOUR, 68 'd' : DAY, 69 } 70 71 @classmethod
72 - def seconds(cls, tm):
73 """ 74 Convert tm to seconds based on suffix. 75 @param tm: A timeout value. 76 The string value may have a suffix of: 77 (s) = seconds 78 (m) = minutes 79 (h) = hours 80 (d) = days 81 @type tm: (None|int|float|str) 82 83 """ 84 if tm is None: 85 return tm 86 if isinstance(tm, int): 87 return tm 88 if isinstance(tm, float): 89 return int(tm) 90 if not isinstance(tm, (basestring)): 91 raise TypeError(tm) 92 if not len(tm): 93 raise ValueError(tm) 94 if cls.has_suffix(tm): 95 multiplier = cls.SUFFIX[tm[-1]] 96 return (multiplier * int(tm[:-1])) 97 else: 98 return int(tm)
99 100 @classmethod
101 - def has_suffix(cls, tm):
102 for k in cls.SUFFIX.keys(): 103 if tm.endswith(k): 104 return True 105 return False
106
107 - def __init__(self, start=None, duration=None):
108 self.start = self.seconds(start) 109 self.duration = self.seconds(duration)
110
111 - def tuple(self):
112 return (self.start, self.duration)
113
114 # 115 # Exceptions 116 # 117 118 -class RequestTimeout(Exception):
119 """ 120 Request timeout. 121 """ 122
123 - def __init__(self, sn, index):
124 """ 125 @param sn: The request serial number. 126 @type sn: str 127 """ 128 Exception.__init__(self, sn, index)
129
130 - def sn(self):
131 return self.args[0]
132
133 - def index(self):
134 return self.args[1]
135
136 137 # 138 # Policy 139 # 140 141 -class RequestMethod:
142 """ 143 Base class for request methods. 144 @ivar producer: A queue producer. 145 @type producer: L{gofer.messaging.producer.Producer} 146 """ 147
148 - def __init__(self, producer):
149 """ 150 @param producer: A queue producer. 151 @type producer: L{gofer.messaging.producer.Producer} 152 """ 153 self.producer = producer
154
155 - def send(self, address, request, **any):
156 """ 157 Send the request.. 158 @param address: The destination queue address. 159 @type address: str 160 @param request: A request to send. 161 @type request: object 162 @keyword any: Any (extra) data. 163 """ 164 pass
165
166 - def broadcast(self, addresses, request, **any):
167 """ 168 Broadcast the request. 169 @param addresses: A list of destination queue addresses. 170 @type addresses: [str,..] 171 @param request: A request to send. 172 @type request: object 173 @keyword any: Any (extra) data. 174 """ 175 pass
176
177 178 -class Synchronous(RequestMethod):
179 """ 180 The synchronous request method. 181 This method blocks until a reply is received. 182 @ivar reader: A queue reader used to read the reply. 183 @type reader: L{gofer.messaging.consumer.Reader} 184 """ 185 186 TIMEOUT = (10, 90) 187
188 - def __init__(self, producer, options):
189 """ 190 @param producer: A queue producer. 191 @type producer: L{gofer.messaging.producer.Producer} 192 @param options: Policy options. 193 @type options: dict 194 """ 195 self.timeout = timeout(options, self.TIMEOUT) 196 self.queue = Queue(getuuid(), durable=False) 197 self.progress = options.progress 198 RequestMethod.__init__(self, producer)
199
200 - def send(self, destination, request, **any):
201 """ 202 Send the request then read the reply. 203 @param destination: The destination queue address. 204 @type destination: str 205 @param request: A request to send. 206 @type request: object 207 @keyword any: Any (extra) data. 208 @return: The result of the request. 209 @rtype: object 210 @raise Exception: returned by the peer. 211 """ 212 sn = self.producer.send( 213 destination, 214 ttl=self.timeout[0], 215 replyto=str(self.queue), 216 request=request, 217 **any) 218 log.info('sent (%s):\n%s', repr(destination), request) 219 reader = Reader(self.queue, url=self.producer.url) 220 reader.open() 221 try: 222 self.__getstarted(sn, reader) 223 return self.__getreply(sn, reader) 224 finally: 225 reader.close()
226
227 - def __getstarted(self, sn, reader):
228 """ 229 Get the STARTED reply matched by serial number. 230 @param sn: The request serial number. 231 @type sn: str 232 @param reader: A reader. 233 @type reader: L{Reader} 234 @return: The matched reply envelope. 235 @rtype: L{Envelope} 236 """ 237 envelope = reader.search(sn, self.timeout[0]) 238 if envelope: 239 reader.ack() 240 if envelope.status == 'started': 241 log.debug('request (%s), started', sn) 242 else: 243 self.__onreply(envelope) 244 else: 245 raise RequestTimeout(sn, 0)
246
247 - def __getreply(self, sn, reader):
248 """ 249 Get the reply matched by serial number. 250 @param sn: The request serial number. 251 @type sn: str 252 @param reader: A reader. 253 @type reader: L{Reader} 254 @return: The matched reply envelope. 255 @rtype: L{Envelope} 256 """ 257 timer = Timer() 258 timeout = float(self.timeout[1]) 259 while True: 260 timer.start() 261 envelope = reader.search(sn, int(timeout)) 262 if envelope: 263 reader.ack() 264 timer.stop() 265 elapsed = timer.duration() 266 if elapsed > timeout: 267 raise RequestTimeout(sn, 1) 268 else: 269 timeout -= elapsed 270 if envelope: 271 if envelope.status == 'progress': 272 self.__onprogress(envelope) 273 else: 274 return self.__onreply(envelope) 275 else: 276 raise RequestTimeout(sn, 1)
277
278 - def __onreply(self, envelope):
279 """ 280 Handle the reply. 281 @param envelope: The reply envelope. 282 @type envelope: L{Envelope} 283 @return: The matched reply envelope. 284 @rtype: L{Envelope} 285 """ 286 reply = Return(envelope.result) 287 if reply.succeeded(): 288 return reply.retval 289 else: 290 raise RemoteException.instance(reply)
291
292 - def __onprogress(self, envelope):
293 """ 294 Handle the progress report. 295 @param envelope: The status envelope. 296 @type envelope: L{Envelope} 297 """ 298 try: 299 callback = self.progress 300 if callable(callback): 301 report = dict( 302 sn=envelope.sn, 303 any=envelope.any, 304 total=envelope.total, 305 completed=envelope.completed, 306 details=envelope.details) 307 callback(report) 308 except: 309 log.error('progress callback failed', exc_info=1)
310
311 312 -class Asynchronous(RequestMethod):
313 """ 314 The asynchronous request method. 315 """ 316
317 - def __init__(self, producer, options):
318 """ 319 @param producer: A queue producer. 320 @type producer: L{gofer.messaging.producer.Producer} 321 @param options: Policy options. 322 @type options: dict 323 """ 324 RequestMethod.__init__(self, producer) 325 self.ctag = options.ctag 326 self.timeout = timeout(options) 327 self.trigger = options.trigger 328 self.watchdog = options.watchdog
329
330 - def send(self, destination, request, **any):
331 """ 332 Send the specified request and redirect the reply to the 333 queue for the specified reply I{correlation} tag. 334 A trigger(1) specifies a I{manual} trigger. 335 @param destination: The AMQP destination. 336 @type destination: L{Destination} 337 @param request: A request to send. 338 @type request: object 339 @keyword any: Any (extra) data. 340 @return: The request serial number. 341 @rtype: str 342 """ 343 trigger = Trigger(self, destination, request, any) 344 if self.trigger == 1: 345 return trigger 346 trigger() 347 return trigger.sn
348
349 - def broadcast(self, destinations, request, **any):
350 """ 351 Send the specified request and redirect the reply to the 352 queue for the specified reply I{correlation} tag. 353 A trigger(1) specifies a I{manual} trigger. 354 @param destinations: A list of destinations. 355 @type destinations: [L{Destination},..] 356 @param request: A request to send. 357 @type request: object 358 @keyword any: Any (extra) data. 359 """ 360 triggers = [] 361 for destination in destinations: 362 t = Trigger(self, destination, request, any) 363 triggers.append(t) 364 if self.trigger == 1: 365 return triggers 366 for trigger in triggers: 367 trigger() 368 return [t.sn for t in triggers]
369 370
371 - def replyto(self):
372 """ 373 Get replyto based on the correlation I{tag}. 374 @return: The replyto AMQP address. 375 @rtype: str 376 """ 377 if self.ctag: 378 queue = Queue(self.ctag) 379 return str(queue) 380 else: 381 return None
382
383 - def notifywatchdog(self, sn, replyto, any):
384 """ 385 Add the request to the I{watchdog} for tacking. 386 @param sn: A serial number. 387 @type sn: str 388 @param replyto: An AMQP address. 389 @type replyto: str 390 @param any: User defined data. 391 @type any: any 392 """ 393 any = Envelope(any) 394 if replyto and \ 395 self.ctag and \ 396 self.timeout[0] is not None and \ 397 self.timeout[1] is not None and \ 398 self.watchdog is not None: 399 self.watchdog.track( 400 sn, 401 replyto, 402 any.any, 403 self.timeout)
404
405 406 -class Trigger:
407 """ 408 Asynchronous trigger. 409 @ivar __pending: pending flag. 410 @type __pending: bool 411 @ivar __sn: serial number 412 @type __sn: str 413 @ivar __policy: The policy object. 414 @type __policy: L{Asynchronous} 415 @ivar __destination: The AMQP destination. 416 @type __destination: L{Destination} 417 @ivar __request: A request to send. 418 @type __request: object 419 @ivar __any: Any (extra) data. 420 @type __any: dict 421 """ 422
423 - def __init__(self, policy, destination, request, any):
424 """ 425 @param policy: The policy object. 426 @type policy: L{Asynchronous} 427 @param destination: The AMQP destination. 428 @type destination: L{Destination} 429 @param request: A request to send. 430 @type request: object 431 @keyword any: Any (extra) data. 432 """ 433 self.__pending = True 434 self.__sn = getuuid() 435 self.__policy = policy 436 self.__destination = destination 437 self.__request = request 438 self.__any = any
439 440 @property
441 - def sn(self):
442 """ 443 Get serial number. 444 @return: The request serial number. 445 @rtype: str 446 """ 447 return self.__sn
448
449 - def __send(self):
450 """ 451 Send the request using the specified policy 452 object and generated serial number. 453 """ 454 policy = self.__policy 455 destination = self.__destination 456 replyto = policy.replyto() 457 request = self.__request 458 any = self.__any 459 policy.producer.send( 460 destination, 461 sn=self.__sn, 462 ttl=policy.timeout[0], 463 replyto=replyto, 464 request=request, 465 **any) 466 log.info('sent (%s):\n%s', repr(destination), request) 467 policy.notifywatchdog(self.__sn, replyto, any)
468
469 - def __str__(self):
470 return self.__sn
471
472 - def __call__(self):
473 if self.__pending: 474 self.__send() 475 self.__pending = False 476 else: 477 raise Exception('trigger already executed')
478