1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from time import time
17 from threading import local as Local
18
19 from gofer.rmi.window import *
20 from gofer.rmi.tracker import Tracker
21 from gofer.rmi.store import PendingThread
22 from gofer.rmi.dispatcher import Dispatcher, Return
23 from gofer.rmi.threadpool import Immediate
24 from gofer.messaging import Envelope
25 from gofer.messaging.producer import Producer
26 from gofer.metrics import Timer
27 from logging import getLogger
28
29 log = getLogger(__name__)
33 """
34 TTL expired.
35 """
36 pass
37
40 """
41 An RMI task to be scheduled on the plugin's thread pool.
42 @ivar plugin: A plugin.
43 @type plugin: Plugin
44 @ivar envelope: A gofer messaging envelope.
45 @type envelope: L{Envelope}
46 @ivar producer: An AMQP message producer.
47 @type producer: L{Producer}
48 @ivar window: The window in which the task is valid.
49 @type window: dict
50 @ivar ttl: The task time-to-live.
51 @type ttl: float
52 @ivar ts: Timestamp
53 @type ts: float
54 """
55
56 context = Local()
57
58 - def __init__(self, plugin, envelope, producer, commit):
59 """
60 @param plugin: A plugin.
61 @type plugin: Plugin
62 @param envelope: A gofer messaging envelope.
63 @type envelope: L{Envelope}
64 @param producer: An AMQP message producer.
65 @type producer: L{Producer}
66 @param commit: Transaction commit function.
67 @type commit: callable
68 """
69 self.plugin = plugin
70 self.envelope = envelope
71 self.producer = producer
72 self.commit = commit
73 self.window = envelope.window
74 self.ttl = envelope.ttl
75 self.ts = time()
76
91
111
113 """
114 Check the window.
115 @raise WindowPending: when window in the future.
116 @raise WindowMissed: when window missed.
117 """
118 w = self.window
119 if not isinstance(w, dict):
120 return
121 window = Window(w)
122 envelope = self.envelope
123 if window.past():
124 raise WindowMissed(envelope.sn)
125
127 """
128 Check the TTL.
129 @raise Expired: When TTL expired.
130 """
131 ttl = self.ttl
132 if not isinstance(ttl, float):
133 return
134 elapsed = (time()-self.ts)
135 if elapsed > ttl:
136 raise Expired()
137
139 """
140 Send the a status update if requested.
141 @param envelope: The received envelope.
142 @type envelope: L{Envelope}
143 """
144 sn = envelope.sn
145 any = envelope.any
146 replyto = envelope.replyto
147 if not replyto:
148 return
149 try:
150 self.producer.send(
151 replyto,
152 sn=sn,
153 any=any,
154 status='started')
155 except:
156 log.exception('send (started), failed')
157
159 """
160 Send the reply if requested.
161 @param envelope: The received envelope.
162 @type envelope: L{Envelope}
163 @param result: The request result.
164 @type result: object
165 """
166 sn = envelope.sn
167 any = envelope.any
168 ts = envelope.ts
169 now = time()
170 duration = Timer(ts, now)
171 replyto = envelope.replyto
172 log.info('%s processed in: %s', sn, duration)
173 if not replyto:
174 return
175 try:
176 self.producer.send(
177 replyto,
178 sn=sn,
179 any=any,
180 result=result)
181 except:
182 log.exception('send failed:\n%s', result)
183
186 """
187 An I{empty} plugin.
188 Used when the appropriate plugin cannot be found.
189 """
190
193
196
200
203 """
204 The pending request scheduler.
205 Processes the I{pending} queue.
206 @ivar plugins: A collection of loaded plugins.
207 @type plugins: list
208 @ivar producers: A cache of AMQP producers.
209 @type producers: dict
210 """
211
213 """
214 @param plugins: A collection of loaded plugins.
215 @type plugins: list
216 """
217 PendingThread.__init__(self)
218 self.plugins = plugins
219 self.producers = {}
220
222 """
223 Dispatch the specified envelope to plugin that
224 provides the specified class.
225 @param envelope: A gofer messaging envelope.
226 @type envelope: L{Envelope}
227 """
228 url = envelope.url
229 plugin = self.findplugin(envelope)
230 task = Task(
231 plugin,
232 envelope,
233 self.producer(url),
234 self.commit)
235 pool = plugin.getpool()
236 pool.run(task)
237
239 """
240 Find the plugin that provides the class specified in
241 the I{request} embedded in the envelope. Returns
242 L{EmptyPlugin} when not found.
243 @param envelope: A gofer messaging envelope.
244 @type envelope: L{Envelope}
245 @return: The appropriate plugin.
246 @rtype: Plugin
247 """
248 request = Envelope(envelope.request)
249 classname = request.classname
250 for plugin in self.plugins:
251 if plugin.provides(classname):
252 return plugin
253 return EmptyPlugin()
254
256 """
257 Find the cached producer by URL.
258 @param url: The URL of the broker the request was received.
259 @type url: str
260 @return: The appropriate producer.
261 @rtype: L{Producer}
262 """
263 p = self.producers.get(url)
264 if p is None:
265 p = Producer(url=url)
266 self.producers[url] = p
267 return p
268
271 """
272 Remote method invocation context.
273 Provides call context to method implementations.
274 @cvar current: The current call context.
275 @type current: L{Local}
276 """
277
278 @classmethod
281
284 """
285 Provides support for progress reporting.
286 @ivar __task: The current task.
287 @type __task: L{Task}
288 @ivar total: The total work units.
289 @type total: int
290 @ivar completed: The completed work units.
291 @type completed: int
292 @ivar details: The reported details.
293 @type details: object
294 """
295
297 """
298 @param task: The current task.
299 @type task: L{Task}
300 """
301 self.__task = task
302 self.total = 0
303 self.completed = 0
304 self.details = {}
305
307 """
308 Send the progress report.
309 """
310 sn = self.__task.envelope.sn
311 any = self.__task.envelope.any
312 replyto = self.__task.envelope.replyto
313 if not replyto:
314 return
315 try:
316 self.__task.producer.send(
317 replyto,
318 sn=sn,
319 any=any,
320 status='progress',
321 total=self.total,
322 completed=self.completed,
323 details=self.details)
324 except:
325 log.exception('send (progress), failed')
326
329 """
330 A callable added to the Context and used
331 by plugin methods to check for cancellation.
332 @ivar tracker: The cancellation tracker.
333 @type tracker: L{Tracker}
334 """
335
337 """
338 @param sn: Serial number.
339 @type sn: str
340 """
341 self.sn = sn
342 self.tracker = Tracker()
343
346
349