Package gofer :: Package rmi :: Module store
[hide private]
[frames] | no frames]

Source Code for Module gofer.rmi.store

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16  """ 
 17  Provides (local) message storage classes. 
 18  """ 
 19   
 20  import os 
 21  from stat import * 
 22  from gofer import NAME, Singleton 
 23  from gofer.messaging import * 
 24  from gofer.rmi.window import Window 
 25  from gofer.rmi.tracker import Tracker 
 26  from time import time 
 27  from threading import Thread, RLock, Event 
 28  from logging import getLogger 
 29   
 30  log = getLogger(__name__) 
 31   
 32   
33 -class PendingQueue:
34 """ 35 Persistent (local) storage of I{pending} envelopes that have 36 been processed of an AMQP queue. Most likely use is for messages 37 with a future I{window} which cannot be processed immediately. 38 @cvar ROOT: The root directory used for storage. 39 @type ROOT: str 40 @ivar id: The queue id. 41 @type id: str 42 @ivar lastmod: Last (directory) modification. 43 @type lastmod: int 44 @ivar pending: The queue of pending envelopes. 45 @type pending: [Envelope,..] 46 @ivar uncommitted: A list (removed) of files pending commit. 47 @type uncommitted: [path,..] 48 """ 49 50 __metaclass__ = Singleton 51 52 ROOT = '/var/lib/%s/messaging/pending' % NAME 53
54 - def __init__(self):
55 self.__pending = [] 56 self.__uncommitted = {} 57 self.__mutex = RLock() 58 self.__event = Event() 59 self.__mkdir() 60 self.__load()
61
62 - def add(self, url, envelope):
63 """ 64 Enqueue the specified envelope. 65 @param url: The broker URL. 66 @type url: str 67 @param envelope: An L{Envelope} 68 @type envelope: L{Envelope} 69 """ 70 envelope.ts = time() 71 envelope.url = url 72 fn = self.__fn(envelope) 73 f = open(fn, 'w') 74 f.write(envelope.dump()) 75 f.close() 76 log.debug('add pending:\n%s', envelope) 77 self.__lock() 78 try: 79 tracker = Tracker() 80 tracker.add(envelope.sn, envelope.any) 81 self.__pending.insert(0, envelope) 82 finally: 83 self.__unlock() 84 self.__event.set()
85
86 - def get(self, wait=10):
87 """ 88 Get the next pending envelope. 89 @param wait: The number of seconds to block. 90 @type wait: int 91 @return: An L{Envelope} 92 @rtype: L{Envelope} 93 """ 94 assert(wait >= 0) 95 try: 96 return self.__get(wait) 97 finally: 98 self.__event.clear()
99
100 - def commit(self, sn):
101 """ 102 Commit an entry returned by get(). 103 @param sn: The serial number to commit. 104 @type sn: str 105 @raise KeyError: when no found. 106 """ 107 self.__lock() 108 try: 109 log.debug('commit: %s', sn) 110 envelope = self.__uncommitted.pop(sn) 111 fn = self.__fn(envelope) 112 os.unlink(fn) 113 finally: 114 self.__unlock()
115
116 - def __purge(self, envelope):
117 """ 118 Purge the queue entry. 119 @param envelope: An L{Envelope} 120 @type envelope: L{Envelope} 121 """ 122 self.__lock() 123 try: 124 log.info('purge:%s', envelope.sn) 125 fn = self.__fn(envelope) 126 os.unlink(fn) 127 self.__pending.remove(envelope) 128 finally: 129 self.__unlock()
130
131 - def __pendingcommit(self, envelope):
132 """ 133 Move the envelope to the uncommitted list. 134 @param envelope: An L{Envelope} 135 @type envelope: L{Envelope} 136 """ 137 self.__lock() 138 try: 139 self.__pending.remove(envelope) 140 self.__uncommitted[envelope.sn] = envelope 141 finally: 142 self.__unlock()
143
144 - def __load(self):
145 """ 146 Load the in-memory queue from filesystem. 147 """ 148 path = os.path.join(self.ROOT) 149 pending = [] 150 for fn in os.listdir(path): 151 path = os.path.join(self.ROOT, fn) 152 envelope = self.__import(path) 153 if not envelope: 154 continue 155 ctime = self.__created(path) 156 pending.append((ctime, envelope)) 157 pending.sort() 158 self.__lock() 159 try: 160 self.__pending = [p[1] for p in pending] 161 finally: 162 self.__unlock()
163
164 - def __import(self, path):
165 """ 166 Import a stored envelpoe. 167 @param path: An absolute file path. 168 @type path: str 169 @return: An imported envelope. 170 @rtype: L{Envelope} 171 """ 172 try: 173 s = self.__read(path) 174 envelope = Envelope() 175 envelope.load(s) 176 return envelope 177 except: 178 log.exception(path) 179 log.error('%s, discarded', path) 180 os.unlink(path)
181
182 - def __get(self, wait=1):
183 """ 184 Get the next pending envelope. 185 @param wait: The number of seconds to wait for a pending item. 186 @type wait: int 187 @return: (url, L{Envelope}) 188 @rtype: L{Envelope} 189 """ 190 while wait: 191 queue = self.__copy(self.__pending) 192 popped = self.__pop(queue) 193 if popped: 194 log.debug('popped: (%s) %s', popped.url, popped.sn) 195 return popped 196 else: 197 wait -= 1 198 self.__event.wait(1)
199
200 - def __pop(self, queue):
201 """ 202 Pop and return the next I{ready} entry. 203 Entries that have expired (TTL), are purged. 204 Entries that have a future I{window} are excluded. 205 @param queue: An ordered list of candidate entries. 206 @type queue: list 207 @return: An L{Envelope} 208 @rtype: L{Envelope} 209 """ 210 popped = None 211 while queue: 212 envelope = queue.pop() 213 try: 214 if self.__expired(envelope): 215 self.__purge(envelope) 216 # TTL expired 217 continue 218 if self.__delayed(envelope): 219 # future 220 continue 221 self.__pendingcommit(envelope) 222 self.__adjustTTL(envelope) 223 popped = envelope 224 break 225 except Exception: 226 log.error(envelope, exc_info=1) 227 self.__purge(envelope) 228 return popped
229
230 - def __mkdir(self):
231 """ 232 Ensure the directory exists. 233 """ 234 path = self.ROOT 235 if not os.path.exists(path): 236 os.makedirs(path)
237
238 - def __created(self, path):
239 """ 240 Get create timestamp. 241 @return: The file create timestamp. 242 @rtype: int 243 """ 244 stat = os.stat(path) 245 return stat[ST_CTIME]
246
247 - def __modified(self, path):
248 """ 249 Get modification timestamp. 250 @return: The file modification timestamp. 251 @rtype: int 252 """ 253 stat = os.stat(path) 254 return stat[ST_MTIME]
255
256 - def __fn(self, envelope):
257 """ 258 Get the qualified file name for an entry. 259 @param envelope: A queue entry. 260 @type envelope: L{Envelope} 261 @return: The absolute file path. 262 @rtype: str 263 """ 264 return os.path.join(self.ROOT, envelope.sn)
265
266 - def __expired(self, envelope):
267 """ 268 Get whether the envelope has expired. 269 @param envelope: A queue entry. 270 @type envelope: L{Envelope} 271 @return: True when expired based on TTL. 272 @rtype: bool 273 """ 274 now = time() 275 if isinstance(envelope.ttl, float): 276 elapsed = (now-envelope.ts) 277 if envelope.ttl < elapsed: 278 log.info('expired:\n%s', envelope) 279 return True 280 return False
281
282 - def __adjustTTL(self, envelope):
283 """ 284 Adjust the TTL based on time spent on the queue. 285 @param envelope: A queue entry. 286 @type envelope: L{Envelope} 287 """ 288 if isinstance(envelope.ttl, float): 289 elapsed = (time()-envelope.ts) 290 envelope.ttl -= elapsed
291
292 - def __delayed(self, envelope):
293 """ 294 Get whether the envelope has a future window. 295 Cancelled requests are not considered to be delayed and 296 the window is ignored. 297 @param envelope: An L{Envelope} 298 @type envelope: L{Envelope} 299 @return: True when window in the future. 300 @rtype: bool 301 """ 302 tracker = Tracker() 303 if envelope.window and not tracker.cancelled(envelope.sn): 304 window = Window(envelope.window) 305 if window.future(): 306 log.info('deferring:\n%s', envelope) 307 return True 308 return False
309
310 - def __copy(self, collection):
311 self.__lock() 312 try: 313 return collection[:] 314 finally: 315 self.__unlock()
316
317 - def __read(self, path):
318 f = open(path) 319 try: 320 return f.read() 321 finally: 322 f.close()
323
324 - def __lock(self):
325 self.__mutex.acquire()
326
327 - def __unlock(self):
328 self.__mutex.release()
329 330
331 -class PendingThread(Thread):
332 """ 333 A pending queue receiver. 334 @ivar __run: The main run loop flag. 335 @type __run: bool 336 @ivar queue: The L{PendingQueue} being read. 337 @type queue: L{PendingQueue} 338 @ivar consumer: The queue listener. 339 @type consumer: L{gofer.messaging.consumer.Consumer} 340 """ 341
342 - def __init__(self):
343 self.__run = True 344 self.queue = PendingQueue() 345 Thread.__init__(self, name='pending') 346 self.setDaemon(True)
347
348 - def run(self):
349 """ 350 Main receiver (thread). 351 Read and dispatch envelopes. 352 """ 353 log.info('started') 354 while self.__run: 355 envelope = self.queue.get(3) 356 if not envelope: 357 continue 358 self.dispatch(envelope) 359 log.info('stopped')
360
361 - def dispatch(self, envelope):
362 """ 363 Dispatch the envelope. 364 @param envelope: An L{Envelope} 365 @type envelope: L{Envelope} 366 """ 367 pass
368
369 - def commit(self, sn):
370 """ 371 Commit a dispatched envelope. 372 @param sn: The serial number to commit. 373 @type sn: str 374 """ 375 try: 376 self.queue.commit(sn) 377 except KeyError: 378 log.error('%s, not valid for commit', sn)
379
380 - def stop(self):
381 """ 382 Stop the receiver. 383 """ 384 self.__run = False
385