1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Provides (local) message storage classes.
18 """
19
20 import os
21 from stat import *
22 from gofer import NAME, Singleton
23 from gofer.messaging import *
24 from gofer.rmi.window import Window
25 from gofer.rmi.tracker import Tracker
26 from time import time
27 from threading import Thread, RLock, Event
28 from logging import getLogger
29
30 log = getLogger(__name__)
31
32
34 """
35 Persistent (local) storage of I{pending} envelopes that have
36 been processed of an AMQP queue. Most likely use is for messages
37 with a future I{window} which cannot be processed immediately.
38 @cvar ROOT: The root directory used for storage.
39 @type ROOT: str
40 @ivar id: The queue id.
41 @type id: str
42 @ivar lastmod: Last (directory) modification.
43 @type lastmod: int
44 @ivar pending: The queue of pending envelopes.
45 @type pending: [Envelope,..]
46 @ivar uncommitted: A list (removed) of files pending commit.
47 @type uncommitted: [path,..]
48 """
49
50 __metaclass__ = Singleton
51
52 ROOT = '/var/lib/%s/messaging/pending' % NAME
53
55 self.__pending = []
56 self.__uncommitted = {}
57 self.__mutex = RLock()
58 self.__event = Event()
59 self.__mkdir()
60 self.__load()
61
62 - def add(self, url, envelope):
63 """
64 Enqueue the specified envelope.
65 @param url: The broker URL.
66 @type url: str
67 @param envelope: An L{Envelope}
68 @type envelope: L{Envelope}
69 """
70 envelope.ts = time()
71 envelope.url = url
72 fn = self.__fn(envelope)
73 f = open(fn, 'w')
74 f.write(envelope.dump())
75 f.close()
76 log.debug('add pending:\n%s', envelope)
77 self.__lock()
78 try:
79 tracker = Tracker()
80 tracker.add(envelope.sn, envelope.any)
81 self.__pending.insert(0, envelope)
82 finally:
83 self.__unlock()
84 self.__event.set()
85
86 - def get(self, wait=10):
87 """
88 Get the next pending envelope.
89 @param wait: The number of seconds to block.
90 @type wait: int
91 @return: An L{Envelope}
92 @rtype: L{Envelope}
93 """
94 assert(wait >= 0)
95 try:
96 return self.__get(wait)
97 finally:
98 self.__event.clear()
99
101 """
102 Commit an entry returned by get().
103 @param sn: The serial number to commit.
104 @type sn: str
105 @raise KeyError: when no found.
106 """
107 self.__lock()
108 try:
109 log.debug('commit: %s', sn)
110 envelope = self.__uncommitted.pop(sn)
111 fn = self.__fn(envelope)
112 os.unlink(fn)
113 finally:
114 self.__unlock()
115
117 """
118 Purge the queue entry.
119 @param envelope: An L{Envelope}
120 @type envelope: L{Envelope}
121 """
122 self.__lock()
123 try:
124 log.info('purge:%s', envelope.sn)
125 fn = self.__fn(envelope)
126 os.unlink(fn)
127 self.__pending.remove(envelope)
128 finally:
129 self.__unlock()
130
132 """
133 Move the envelope to the uncommitted list.
134 @param envelope: An L{Envelope}
135 @type envelope: L{Envelope}
136 """
137 self.__lock()
138 try:
139 self.__pending.remove(envelope)
140 self.__uncommitted[envelope.sn] = envelope
141 finally:
142 self.__unlock()
143
145 """
146 Load the in-memory queue from filesystem.
147 """
148 path = os.path.join(self.ROOT)
149 pending = []
150 for fn in os.listdir(path):
151 path = os.path.join(self.ROOT, fn)
152 envelope = self.__import(path)
153 if not envelope:
154 continue
155 ctime = self.__created(path)
156 pending.append((ctime, envelope))
157 pending.sort()
158 self.__lock()
159 try:
160 self.__pending = [p[1] for p in pending]
161 finally:
162 self.__unlock()
163
165 """
166 Import a stored envelpoe.
167 @param path: An absolute file path.
168 @type path: str
169 @return: An imported envelope.
170 @rtype: L{Envelope}
171 """
172 try:
173 s = self.__read(path)
174 envelope = Envelope()
175 envelope.load(s)
176 return envelope
177 except:
178 log.exception(path)
179 log.error('%s, discarded', path)
180 os.unlink(path)
181
182 - def __get(self, wait=1):
183 """
184 Get the next pending envelope.
185 @param wait: The number of seconds to wait for a pending item.
186 @type wait: int
187 @return: (url, L{Envelope})
188 @rtype: L{Envelope}
189 """
190 while wait:
191 queue = self.__copy(self.__pending)
192 popped = self.__pop(queue)
193 if popped:
194 log.debug('popped: (%s) %s', popped.url, popped.sn)
195 return popped
196 else:
197 wait -= 1
198 self.__event.wait(1)
199
201 """
202 Pop and return the next I{ready} entry.
203 Entries that have expired (TTL), are purged.
204 Entries that have a future I{window} are excluded.
205 @param queue: An ordered list of candidate entries.
206 @type queue: list
207 @return: An L{Envelope}
208 @rtype: L{Envelope}
209 """
210 popped = None
211 while queue:
212 envelope = queue.pop()
213 try:
214 if self.__expired(envelope):
215 self.__purge(envelope)
216
217 continue
218 if self.__delayed(envelope):
219
220 continue
221 self.__pendingcommit(envelope)
222 self.__adjustTTL(envelope)
223 popped = envelope
224 break
225 except Exception:
226 log.error(envelope, exc_info=1)
227 self.__purge(envelope)
228 return popped
229
231 """
232 Ensure the directory exists.
233 """
234 path = self.ROOT
235 if not os.path.exists(path):
236 os.makedirs(path)
237
239 """
240 Get create timestamp.
241 @return: The file create timestamp.
242 @rtype: int
243 """
244 stat = os.stat(path)
245 return stat[ST_CTIME]
246
248 """
249 Get modification timestamp.
250 @return: The file modification timestamp.
251 @rtype: int
252 """
253 stat = os.stat(path)
254 return stat[ST_MTIME]
255
256 - def __fn(self, envelope):
257 """
258 Get the qualified file name for an entry.
259 @param envelope: A queue entry.
260 @type envelope: L{Envelope}
261 @return: The absolute file path.
262 @rtype: str
263 """
264 return os.path.join(self.ROOT, envelope.sn)
265
267 """
268 Get whether the envelope has expired.
269 @param envelope: A queue entry.
270 @type envelope: L{Envelope}
271 @return: True when expired based on TTL.
272 @rtype: bool
273 """
274 now = time()
275 if isinstance(envelope.ttl, float):
276 elapsed = (now-envelope.ts)
277 if envelope.ttl < elapsed:
278 log.info('expired:\n%s', envelope)
279 return True
280 return False
281
283 """
284 Adjust the TTL based on time spent on the queue.
285 @param envelope: A queue entry.
286 @type envelope: L{Envelope}
287 """
288 if isinstance(envelope.ttl, float):
289 elapsed = (time()-envelope.ts)
290 envelope.ttl -= elapsed
291
293 """
294 Get whether the envelope has a future window.
295 Cancelled requests are not considered to be delayed and
296 the window is ignored.
297 @param envelope: An L{Envelope}
298 @type envelope: L{Envelope}
299 @return: True when window in the future.
300 @rtype: bool
301 """
302 tracker = Tracker()
303 if envelope.window and not tracker.cancelled(envelope.sn):
304 window = Window(envelope.window)
305 if window.future():
306 log.info('deferring:\n%s', envelope)
307 return True
308 return False
309
310 - def __copy(self, collection):
311 self.__lock()
312 try:
313 return collection[:]
314 finally:
315 self.__unlock()
316
318 f = open(path)
319 try:
320 return f.read()
321 finally:
322 f.close()
323
326
329
330
332 """
333 A pending queue receiver.
334 @ivar __run: The main run loop flag.
335 @type __run: bool
336 @ivar queue: The L{PendingQueue} being read.
337 @type queue: L{PendingQueue}
338 @ivar consumer: The queue listener.
339 @type consumer: L{gofer.messaging.consumer.Consumer}
340 """
341
343 self.__run = True
344 self.queue = PendingQueue()
345 Thread.__init__(self, name='pending')
346 self.setDaemon(True)
347
349 """
350 Main receiver (thread).
351 Read and dispatch envelopes.
352 """
353 log.info('started')
354 while self.__run:
355 envelope = self.queue.get(3)
356 if not envelope:
357 continue
358 self.dispatch(envelope)
359 log.info('stopped')
360
362 """
363 Dispatch the envelope.
364 @param envelope: An L{Envelope}
365 @type envelope: L{Envelope}
366 """
367 pass
368
370 """
371 Commit a dispatched envelope.
372 @param sn: The serial number to commit.
373 @type sn: str
374 """
375 try:
376 self.queue.commit(sn)
377 except KeyError:
378 log.error('%s, not valid for commit', sn)
379
381 """
382 Stop the receiver.
383 """
384 self.__run = False
385