1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Contains request delivery policies.
18 """
19
20 from gofer.messaging import *
21 from gofer.rmi.dispatcher import *
22 from gofer.metrics import Timer
23 from gofer.messaging.consumer import Reader
24 from logging import getLogger
25
26 log = getLogger(__name__)
27
28
29
30
31
32 -def timeout(options, none=(None,None)):
33 """
34 Extract (and default as necessary) the timeout option.
35 @param options: Policy options.
36 @type options: dict
37 @return: The timeout (<start>,<duration>)
38 @rtype: tuple
39 """
40 tm = options.timeout
41 if tm is None:
42 return none
43 if isinstance(tm, (list,tuple)):
44 timeout = Timeout(*tm)
45 else:
46 timeout = Timeout(tm, tm)
47 return timeout.tuple()
48
51 """
52 Policy timeout.
53 @cvar MINUTE: Minutes in seconds.
54 @cvar HOUR: Hour is seconds
55 @cvar DAY: Day in seconds
56 @cvar SUFFIX: Suffix to multiplier mapping.
57 """
58
59 SECOND = 1
60 MINUTE = 60
61 HOUR = (MINUTE * 60)
62 DAY = (HOUR * 24)
63
64 SUFFIX = {
65 's' : SECOND,
66 'm' : MINUTE,
67 'h' : HOUR,
68 'd' : DAY,
69 }
70
71 @classmethod
73 """
74 Convert tm to seconds based on suffix.
75 @param tm: A timeout value.
76 The string value may have a suffix of:
77 (s) = seconds
78 (m) = minutes
79 (h) = hours
80 (d) = days
81 @type tm: (None|int|float|str)
82
83 """
84 if tm is None:
85 return tm
86 if isinstance(tm, int):
87 return tm
88 if isinstance(tm, float):
89 return int(tm)
90 if not isinstance(tm, (basestring)):
91 raise TypeError(tm)
92 if not len(tm):
93 raise ValueError(tm)
94 if cls.has_suffix(tm):
95 multiplier = cls.SUFFIX[tm[-1]]
96 return (multiplier * int(tm[:-1]))
97 else:
98 return int(tm)
99
100 @classmethod
102 for k in cls.SUFFIX.keys():
103 if tm.endswith(k):
104 return True
105 return False
106
107 - def __init__(self, start=None, duration=None):
110
113
119 """
120 Request timeout.
121 """
122
124 """
125 @param sn: The request serial number.
126 @type sn: str
127 """
128 Exception.__init__(self, sn, index)
129
132
135
142 """
143 Base class for request methods.
144 @ivar producer: A queue producer.
145 @type producer: L{gofer.messaging.producer.Producer}
146 """
147
149 """
150 @param producer: A queue producer.
151 @type producer: L{gofer.messaging.producer.Producer}
152 """
153 self.producer = producer
154
155 - def send(self, address, request, **any):
156 """
157 Send the request..
158 @param address: The destination queue address.
159 @type address: str
160 @param request: A request to send.
161 @type request: object
162 @keyword any: Any (extra) data.
163 """
164 pass
165
166 - def broadcast(self, addresses, request, **any):
167 """
168 Broadcast the request.
169 @param addresses: A list of destination queue addresses.
170 @type addresses: [str,..]
171 @param request: A request to send.
172 @type request: object
173 @keyword any: Any (extra) data.
174 """
175 pass
176
179 """
180 The synchronous request method.
181 This method blocks until a reply is received.
182 @ivar reader: A queue reader used to read the reply.
183 @type reader: L{gofer.messaging.consumer.Reader}
184 """
185
186 TIMEOUT = (10, 90)
187
199
200 - def send(self, destination, request, **any):
201 """
202 Send the request then read the reply.
203 @param destination: The destination queue address.
204 @type destination: str
205 @param request: A request to send.
206 @type request: object
207 @keyword any: Any (extra) data.
208 @return: The result of the request.
209 @rtype: object
210 @raise Exception: returned by the peer.
211 """
212 sn = self.producer.send(
213 destination,
214 ttl=self.timeout[0],
215 replyto=str(self.queue),
216 request=request,
217 **any)
218 log.info('sent (%s):\n%s', repr(destination), request)
219 reader = Reader(self.queue, url=self.producer.url)
220 reader.open()
221 try:
222 self.__getstarted(sn, reader)
223 return self.__getreply(sn, reader)
224 finally:
225 reader.close()
226
228 """
229 Get the STARTED reply matched by serial number.
230 @param sn: The request serial number.
231 @type sn: str
232 @param reader: A reader.
233 @type reader: L{Reader}
234 @return: The matched reply envelope.
235 @rtype: L{Envelope}
236 """
237 envelope = reader.search(sn, self.timeout[0])
238 if envelope:
239 reader.ack()
240 if envelope.status == 'started':
241 log.debug('request (%s), started', sn)
242 else:
243 self.__onreply(envelope)
244 else:
245 raise RequestTimeout(sn, 0)
246
248 """
249 Get the reply matched by serial number.
250 @param sn: The request serial number.
251 @type sn: str
252 @param reader: A reader.
253 @type reader: L{Reader}
254 @return: The matched reply envelope.
255 @rtype: L{Envelope}
256 """
257 timer = Timer()
258 timeout = float(self.timeout[1])
259 while True:
260 timer.start()
261 envelope = reader.search(sn, int(timeout))
262 if envelope:
263 reader.ack()
264 timer.stop()
265 elapsed = timer.duration()
266 if elapsed > timeout:
267 raise RequestTimeout(sn, 1)
268 else:
269 timeout -= elapsed
270 if envelope:
271 if envelope.status == 'progress':
272 self.__onprogress(envelope)
273 else:
274 return self.__onreply(envelope)
275 else:
276 raise RequestTimeout(sn, 1)
277
279 """
280 Handle the reply.
281 @param envelope: The reply envelope.
282 @type envelope: L{Envelope}
283 @return: The matched reply envelope.
284 @rtype: L{Envelope}
285 """
286 reply = Return(envelope.result)
287 if reply.succeeded():
288 return reply.retval
289 else:
290 raise RemoteException.instance(reply)
291
293 """
294 Handle the progress report.
295 @param envelope: The status envelope.
296 @type envelope: L{Envelope}
297 """
298 try:
299 callback = self.progress
300 if callable(callback):
301 report = dict(
302 sn=envelope.sn,
303 any=envelope.any,
304 total=envelope.total,
305 completed=envelope.completed,
306 details=envelope.details)
307 callback(report)
308 except:
309 log.error('progress callback failed', exc_info=1)
310
313 """
314 The asynchronous request method.
315 """
316
318 """
319 @param producer: A queue producer.
320 @type producer: L{gofer.messaging.producer.Producer}
321 @param options: Policy options.
322 @type options: dict
323 """
324 RequestMethod.__init__(self, producer)
325 self.ctag = options.ctag
326 self.timeout = timeout(options)
327 self.trigger = options.trigger
328 self.watchdog = options.watchdog
329
330 - def send(self, destination, request, **any):
331 """
332 Send the specified request and redirect the reply to the
333 queue for the specified reply I{correlation} tag.
334 A trigger(1) specifies a I{manual} trigger.
335 @param destination: The AMQP destination.
336 @type destination: L{Destination}
337 @param request: A request to send.
338 @type request: object
339 @keyword any: Any (extra) data.
340 @return: The request serial number.
341 @rtype: str
342 """
343 trigger = Trigger(self, destination, request, any)
344 if self.trigger == 1:
345 return trigger
346 trigger()
347 return trigger.sn
348
349 - def broadcast(self, destinations, request, **any):
350 """
351 Send the specified request and redirect the reply to the
352 queue for the specified reply I{correlation} tag.
353 A trigger(1) specifies a I{manual} trigger.
354 @param destinations: A list of destinations.
355 @type destinations: [L{Destination},..]
356 @param request: A request to send.
357 @type request: object
358 @keyword any: Any (extra) data.
359 """
360 triggers = []
361 for destination in destinations:
362 t = Trigger(self, destination, request, any)
363 triggers.append(t)
364 if self.trigger == 1:
365 return triggers
366 for trigger in triggers:
367 trigger()
368 return [t.sn for t in triggers]
369
370
372 """
373 Get replyto based on the correlation I{tag}.
374 @return: The replyto AMQP address.
375 @rtype: str
376 """
377 if self.ctag:
378 queue = Queue(self.ctag)
379 return str(queue)
380 else:
381 return None
382
384 """
385 Add the request to the I{watchdog} for tacking.
386 @param sn: A serial number.
387 @type sn: str
388 @param replyto: An AMQP address.
389 @type replyto: str
390 @param any: User defined data.
391 @type any: any
392 """
393 any = Envelope(any)
394 if replyto and \
395 self.ctag and \
396 self.timeout[0] is not None and \
397 self.timeout[1] is not None and \
398 self.watchdog is not None:
399 self.watchdog.track(
400 sn,
401 replyto,
402 any.any,
403 self.timeout)
404
407 """
408 Asynchronous trigger.
409 @ivar __pending: pending flag.
410 @type __pending: bool
411 @ivar __sn: serial number
412 @type __sn: str
413 @ivar __policy: The policy object.
414 @type __policy: L{Asynchronous}
415 @ivar __destination: The AMQP destination.
416 @type __destination: L{Destination}
417 @ivar __request: A request to send.
418 @type __request: object
419 @ivar __any: Any (extra) data.
420 @type __any: dict
421 """
422
423 - def __init__(self, policy, destination, request, any):
424 """
425 @param policy: The policy object.
426 @type policy: L{Asynchronous}
427 @param destination: The AMQP destination.
428 @type destination: L{Destination}
429 @param request: A request to send.
430 @type request: object
431 @keyword any: Any (extra) data.
432 """
433 self.__pending = True
434 self.__sn = getuuid()
435 self.__policy = policy
436 self.__destination = destination
437 self.__request = request
438 self.__any = any
439
440 @property
442 """
443 Get serial number.
444 @return: The request serial number.
445 @rtype: str
446 """
447 return self.__sn
448
468
471
473 if self.__pending:
474 self.__send()
475 self.__pending = False
476 else:
477 raise Exception('trigger already executed')
478