1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 Provides AMQP message consumer classes.
19 """
20
21 from time import sleep
22 from threading import Thread
23 from gofer.messaging import *
24 from gofer.messaging.endpoint import Endpoint
25 from qpid.messaging import LinkError, Empty
26 from logging import getLogger
27
28 log = getLogger(__name__)
32 """
33 Message consumer thread.
34 @cvar WAIT: How long (seconds) to wait for messages.
35 @type WAIT: int
36 @ivar __run: The main run latch.
37 @type __run: bool
38 @ivar __consumer: The (target) consumer.
39 @type __consumer: L{Consumer}
40 """
41
42 WAIT = 3
43
45 """
46 @param consumer: The (target) consumer that is notified
47 when messages are fetched.
48 @type consumer: L{Consumer}
49 """
50 self.__run = True
51 self.__consumer = consumer
52 Thread.__init__(self, name=consumer.id())
53 self.setDaemon(True)
54
56 """
57 Thread main().
58 Consumer messages and forward to the (target) consumer.
59 """
60 msg = None
61 receiver = self.__open()
62 while self.__run:
63 try:
64 msg = self.__fetch(receiver)
65 if msg:
66 self.__consumer.received(msg)
67 log.debug('ready')
68 except LinkError:
69 log.error('aborting', exc_info=1)
70 return
71 except:
72 log.error('failed:\n%s', msg, exc_info=1)
73 receiver.close()
74 log.info('stopped')
75
77 """
78 Stop the thread.
79 """
80 self.__run = False
81
83 """
84 Open the AMQP receiver.
85 @return: The opened receiver.
86 @rtype: Receiver
87 """
88 addr = self.__consumer.address()
89 ssn = self.__consumer.session()
90 log.debug('open %s', addr)
91 return ssn.receiver(addr)
92
94 """
95 Fetch the next available message.
96 @param receiver: An AMQP receiver.
97 @type receiver: Receiver
98 """
99 try:
100 return receiver.fetch(timeout=self.WAIT)
101 except Empty:
102
103 pass
104 except:
105 sleep(self.WAIT)
106 raise
107
110 """
111 An AMQP (abstract) consumer.
112 The received() method needs to be overridden.
113 @ivar __started: Indicates that start() has been called.
114 @type __started: bool
115 @ivar __destination: The AMQP destination to consume.
116 @type __destination: L{Destination}
117 @ivar __thread: The receiver thread.
118 @type __thread: L{ReceiverThread}
119 """
120
121 @classmethod
123 """
124 Extract the message subject.
125 @param message: The received message.
126 @type message: qpid.messaging.Message
127 @return: The message subject
128 @rtype: str
129 """
130 return message.properties.get('qpid.subject')
131
132 - def __init__(self, destination, **options):
133 """
134 @param destination: The destination to consumer.
135 @type destination: L{Destination}
136 """
137 Endpoint.__init__(self, **options)
138 self.__started = False
139 self.__destination = destination
140 self.__thread = ReceiverThread(self)
141
143 """
144 Get the endpoint id
145 @return: The destination (simple) address.
146 @rtype: str
147 """
148 self._lock()
149 try:
150 return repr(self.__destination)
151 finally:
152 self._unlock()
153
155 """
156 Get the AMQP address for this endpoint.
157 @return: The AMQP address.
158 @rtype: str
159 """
160 self._lock()
161 try:
162 return str(self.__destination)
163 finally:
164 self._unlock()
165
167 """
168 Start processing messages on the queue.
169 """
170 self._lock()
171 try:
172 if self.__started:
173 return
174 self.__thread.start()
175 self.__started = True
176 finally:
177 self._unlock()
178
180 """
181 Stop processing requests.
182 """
183 self._lock()
184 try:
185 if not self.__started:
186 return
187 self.__thread.stop()
188 self.__thread.join(90)
189 self.__started = False
190 finally:
191 self._unlock()
192
194 """
195 Close the consumer.
196 Stop the receiver thread.
197 """
198 self._lock()
199 try:
200 self.stop()
201 finally:
202 self._unlock()
203 Endpoint.close(self)
204
206 """
207 Join the worker thread.
208 """
209 self.__thread.join()
210
212 """
213 Process received request.
214 Inject subject & destination.uuid.
215 @param message: The received message.
216 @type message: qpid.messaging.Message
217 """
218 self._lock()
219 try:
220 self.__received(message)
221 finally:
222 self._unlock()
223
224 - def valid(self, envelope):
225 """
226 Check to see if the envelope is valid.
227 @param envelope: The received envelope.
228 @type envelope: qpid.messaging.Message
229 """
230 valid = True
231 if envelope.version != version:
232 valid = False
233 log.warn('{%s} version mismatch (discarded):\n%s',
234 self.id(), envelope)
235 return valid
236
238 """
239 Dispatch received request.
240 @param envelope: The received envelope.
241 @type envelope: qpid.messaging.Message
242 """
243 pass
244
246 """
247 Process received request.
248 Inject subject & destination.uuid.
249 @param message: The received message.
250 @type message: qpid.messaging.Message
251 """
252 envelope = Envelope()
253 envelope.load(message.content)
254 envelope.subject = self.subject(message)
255 envelope.ttl = message.ttl
256 log.debug('{%s} received:\n%s', self.id(), envelope)
257 if self.valid(envelope):
258 self.dispatch(envelope)
259 self.ack()
260
263 """
264 An AMQP message reader.
265 @ivar __opened: Indicates that open() has been called.
266 @type __opened: bool
267 @ivar __receiver: An AMQP receiver to read.
268 @type __receiver: Receiver
269 @ivar __destination: The AMQP destination to read.
270 @type __destination: L{Destination}
271 """
272
273 - def __init__(self, destination, **options):
274 """
275 @param destination: The destination to consumer.
276 @type destination: L{Destination}
277 @param options: Options passed to Endpoint.
278 @type options: dict
279 """
280 self.__opened = False
281 self.__receiver = None
282 self.__destination = destination
283 Endpoint.__init__(self, **options)
284
286 """
287 Open the reader.
288 """
289 Endpoint.open(self)
290 self._lock()
291 try:
292 if self.__opened:
293 return
294 ssn = self.session()
295 addr = self.address()
296 log.debug('{%s} open %s', self.id(), addr)
297 self.__receiver = ssn.receiver(addr)
298 self.__opened = True
299 finally:
300 self._unlock()
301
303 """
304 Close the reader.
305 """
306 self._lock()
307 try:
308 if not self.__opened:
309 return
310 self.__receiver.close()
311 self.__opened = False
312 finally:
313 self._unlock()
314 Endpoint.close(self)
315
316 - def next(self, timeout=90):
317 """
318 Get the next envelope from the queue.
319 @param timeout: The read timeout.
320 @type timeout: int
321 @return: The next envelope.
322 @rtype: L{Envelope}
323 """
324 msg = self.__fetch(timeout)
325 if msg:
326 envelope = Envelope()
327 envelope.load(msg.content)
328 envelope.subject = Consumer.subject(msg)
329 log.debug('{%s} read next:\n%s', self.id(), envelope)
330 return envelope
331
332 - def read(self, timeout=90):
333 """
334 Get the next message from the queue.
335 @param timeout: The read timeout.
336 @type timeout: int
337 @return: The next message.
338 @rtype: Message
339 """
340 return self.__fetch(timeout)
341
342 - def search(self, sn, timeout=90):
343 """
344 Seach the reply queue for the envelope with
345 the matching serial #.
346 @param sn: The expected serial number.
347 @type sn: str
348 @param timeout: The read timeout.
349 @type timeout: int
350 @return: The next envelope.
351 @rtype: L{Envelope}
352 """
353 log.debug('{%s} searching for: sn=%s', self.id(), sn)
354 while True:
355 envelope = self.next(timeout)
356 if not envelope:
357 return
358 if sn == envelope.sn:
359 log.debug('{%s} search found:\n%s', self.id(), envelope)
360 return envelope
361 else:
362 log.debug('{%s} search discarding:\n%s', self.id(), envelope)
363 self.ack()
364
366 """
367 Get the AMQP address for this endpoint.
368 @return: The AMQP address.
369 @rtype: str
370 """
371 self._lock()
372 try:
373 return str(self.__destination)
374 finally:
375 self._unlock()
376
378 """
379 Fetch the next message.
380 @param timeout: The read timeout.
381 @type timeout: int
382 @return: The next message, or (None).
383 @rtype: Message
384 """
385 try:
386 self.open()
387 return self.__receiver.fetch(timeout=timeout)
388 except Empty:
389
390 pass
391 except:
392 log.error(self.id(), exc_info=1)
393