Package gofer :: Package agent :: Module rmi
[hide private]
[frames] | no frames]

Source Code for Module gofer.agent.rmi

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16  from time import time 
 17  from threading import local as Local 
 18   
 19  from gofer.rmi.window import * 
 20  from gofer.rmi.tracker import Tracker 
 21  from gofer.rmi.store import PendingThread 
 22  from gofer.rmi.dispatcher import Dispatcher, Return 
 23  from gofer.rmi.threadpool import Immediate 
 24  from gofer.messaging import Envelope 
 25  from gofer.messaging.producer import Producer 
 26  from gofer.metrics import Timer 
 27  from logging import getLogger 
 28   
 29  log = getLogger(__name__) 
30 31 32 -class Expired(Exception):
33 """ 34 TTL expired. 35 """ 36 pass
37
38 39 -class Task:
40 """ 41 An RMI task to be scheduled on the plugin's thread pool. 42 @ivar plugin: A plugin. 43 @type plugin: Plugin 44 @ivar envelope: A gofer messaging envelope. 45 @type envelope: L{Envelope} 46 @ivar producer: An AMQP message producer. 47 @type producer: L{Producer} 48 @ivar window: The window in which the task is valid. 49 @type window: dict 50 @ivar ttl: The task time-to-live. 51 @type ttl: float 52 @ivar ts: Timestamp 53 @type ts: float 54 """ 55 56 context = Local() 57
58 - def __init__(self, plugin, envelope, producer, commit):
59 """ 60 @param plugin: A plugin. 61 @type plugin: Plugin 62 @param envelope: A gofer messaging envelope. 63 @type envelope: L{Envelope} 64 @param producer: An AMQP message producer. 65 @type producer: L{Producer} 66 @param commit: Transaction commit function. 67 @type commit: callable 68 """ 69 self.plugin = plugin 70 self.envelope = envelope 71 self.producer = producer 72 self.commit = commit 73 self.window = envelope.window 74 self.ttl = envelope.ttl 75 self.ts = time()
76
77 - def __call__(self, *args, **options):
78 """ 79 Dispatch received request. 80 """ 81 envelope = self.envelope 82 self.context.sn = envelope.sn 83 self.context.progress = Progress(self) 84 self.context.cancelled = Cancelled(envelope.sn) 85 try: 86 self.__call() 87 finally: 88 self.context.sn = None 89 self.context.progress = None 90 self.context.cancelled = None
91
92 - def __call(self):
93 """ 94 Dispatch received request. 95 """ 96 envelope = self.envelope 97 try: 98 self.expired() 99 self.missed() 100 self.sendstarted(envelope) 101 result = self.plugin.dispatch(envelope) 102 self.commit(envelope.sn) 103 self.sendreply(envelope, result) 104 except Expired: 105 self.commit(envelope.sn) 106 log.info('expired:\n%s', envelope) 107 except WindowMissed: 108 self.commit(envelope.sn) 109 log.info('window missed:\n%s', envelope) 110 self.sendreply(envelope, Return.exception())
111
112 - def missed(self):
113 """ 114 Check the window. 115 @raise WindowPending: when window in the future. 116 @raise WindowMissed: when window missed. 117 """ 118 w = self.window 119 if not isinstance(w, dict): 120 return 121 window = Window(w) 122 envelope = self.envelope 123 if window.past(): 124 raise WindowMissed(envelope.sn)
125
126 - def expired(self):
127 """ 128 Check the TTL. 129 @raise Expired: When TTL expired. 130 """ 131 ttl = self.ttl 132 if not isinstance(ttl, float): 133 return 134 elapsed = (time()-self.ts) 135 if elapsed > ttl: 136 raise Expired()
137
138 - def sendstarted(self, envelope):
139 """ 140 Send the a status update if requested. 141 @param envelope: The received envelope. 142 @type envelope: L{Envelope} 143 """ 144 sn = envelope.sn 145 any = envelope.any 146 replyto = envelope.replyto 147 if not replyto: 148 return 149 try: 150 self.producer.send( 151 replyto, 152 sn=sn, 153 any=any, 154 status='started') 155 except: 156 log.exception('send (started), failed')
157
158 - def sendreply(self, envelope, result):
159 """ 160 Send the reply if requested. 161 @param envelope: The received envelope. 162 @type envelope: L{Envelope} 163 @param result: The request result. 164 @type result: object 165 """ 166 sn = envelope.sn 167 any = envelope.any 168 ts = envelope.ts 169 now = time() 170 duration = Timer(ts, now) 171 replyto = envelope.replyto 172 log.info('%s processed in: %s', sn, duration) 173 if not replyto: 174 return 175 try: 176 self.producer.send( 177 replyto, 178 sn=sn, 179 any=any, 180 result=result) 181 except: 182 log.exception('send failed:\n%s', result)
183
184 185 -class EmptyPlugin:
186 """ 187 An I{empty} plugin. 188 Used when the appropriate plugin cannot be found. 189 """ 190
191 - def getpool(self):
192 return Immediate()
193
194 - def provides(self, classname):
195 return False
196
197 - def dispatch(self, request):
198 d = Dispatcher({}) 199 return d.dispatch(request)
200
201 202 -class Scheduler(PendingThread):
203 """ 204 The pending request scheduler. 205 Processes the I{pending} queue. 206 @ivar plugins: A collection of loaded plugins. 207 @type plugins: list 208 @ivar producers: A cache of AMQP producers. 209 @type producers: dict 210 """ 211
212 - def __init__(self, plugins):
213 """ 214 @param plugins: A collection of loaded plugins. 215 @type plugins: list 216 """ 217 PendingThread.__init__(self) 218 self.plugins = plugins 219 self.producers = {}
220
221 - def dispatch(self, envelope):
222 """ 223 Dispatch the specified envelope to plugin that 224 provides the specified class. 225 @param envelope: A gofer messaging envelope. 226 @type envelope: L{Envelope} 227 """ 228 url = envelope.url 229 plugin = self.findplugin(envelope) 230 task = Task( 231 plugin, 232 envelope, 233 self.producer(url), 234 self.commit) 235 pool = plugin.getpool() 236 pool.run(task)
237
238 - def findplugin(self, envelope):
239 """ 240 Find the plugin that provides the class specified in 241 the I{request} embedded in the envelope. Returns 242 L{EmptyPlugin} when not found. 243 @param envelope: A gofer messaging envelope. 244 @type envelope: L{Envelope} 245 @return: The appropriate plugin. 246 @rtype: Plugin 247 """ 248 request = Envelope(envelope.request) 249 classname = request.classname 250 for plugin in self.plugins: 251 if plugin.provides(classname): 252 return plugin 253 return EmptyPlugin()
254
255 - def producer(self, url):
256 """ 257 Find the cached producer by URL. 258 @param url: The URL of the broker the request was received. 259 @type url: str 260 @return: The appropriate producer. 261 @rtype: L{Producer} 262 """ 263 p = self.producers.get(url) 264 if p is None: 265 p = Producer(url=url) 266 self.producers[url] = p 267 return p
268
269 270 -class Context:
271 """ 272 Remote method invocation context. 273 Provides call context to method implementations. 274 @cvar current: The current call context. 275 @type current: L{Local} 276 """ 277 278 @classmethod
279 - def current(cls):
280 return Task.context
281
282 283 -class Progress:
284 """ 285 Provides support for progress reporting. 286 @ivar __task: The current task. 287 @type __task: L{Task} 288 @ivar total: The total work units. 289 @type total: int 290 @ivar completed: The completed work units. 291 @type completed: int 292 @ivar details: The reported details. 293 @type details: object 294 """ 295
296 - def __init__(self, task):
297 """ 298 @param task: The current task. 299 @type task: L{Task} 300 """ 301 self.__task = task 302 self.total = 0 303 self.completed = 0 304 self.details = {}
305
306 - def report(self):
307 """ 308 Send the progress report. 309 """ 310 sn = self.__task.envelope.sn 311 any = self.__task.envelope.any 312 replyto = self.__task.envelope.replyto 313 if not replyto: 314 return 315 try: 316 self.__task.producer.send( 317 replyto, 318 sn=sn, 319 any=any, 320 status='progress', 321 total=self.total, 322 completed=self.completed, 323 details=self.details) 324 except: 325 log.exception('send (progress), failed')
326
327 328 -class Cancelled:
329 """ 330 A callable added to the Context and used 331 by plugin methods to check for cancellation. 332 @ivar tracker: The cancellation tracker. 333 @type tracker: L{Tracker} 334 """ 335
336 - def __init__(self, sn):
337 """ 338 @param sn: Serial number. 339 @type sn: str 340 """ 341 self.sn = sn 342 self.tracker = Tracker()
343
344 - def __call__(self):
345 return self.tracker.cancelled(self.sn)
346
347 - def __del__(self):
348 self.tracker.remove(self.sn)
349