Package gofer :: Package messaging :: Module consumer
[hide private]
[frames] | no frames]

Source Code for Module gofer.messaging.consumer

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16   
 17  """ 
 18  Provides AMQP message consumer classes. 
 19  """ 
 20   
 21  from time import sleep 
 22  from threading import Thread 
 23  from gofer.messaging import * 
 24  from gofer.messaging.endpoint import Endpoint 
 25  from qpid.messaging import LinkError, Empty 
 26  from logging import getLogger 
 27   
 28  log = getLogger(__name__) 
29 30 31 -class ReceiverThread(Thread):
32 """ 33 Message consumer thread. 34 @cvar WAIT: How long (seconds) to wait for messages. 35 @type WAIT: int 36 @ivar __run: The main run latch. 37 @type __run: bool 38 @ivar __consumer: The (target) consumer. 39 @type __consumer: L{Consumer} 40 """ 41 42 WAIT = 3 43
44 - def __init__(self, consumer):
45 """ 46 @param consumer: The (target) consumer that is notified 47 when messages are fetched. 48 @type consumer: L{Consumer} 49 """ 50 self.__run = True 51 self.__consumer = consumer 52 Thread.__init__(self, name=consumer.id()) 53 self.setDaemon(True)
54
55 - def run(self):
56 """ 57 Thread main(). 58 Consumer messages and forward to the (target) consumer. 59 """ 60 msg = None 61 receiver = self.__open() 62 while self.__run: 63 try: 64 msg = self.__fetch(receiver) 65 if msg: 66 self.__consumer.received(msg) 67 log.debug('ready') 68 except LinkError: 69 log.error('aborting', exc_info=1) 70 return 71 except: 72 log.error('failed:\n%s', msg, exc_info=1) 73 receiver.close() 74 log.info('stopped')
75
76 - def stop(self):
77 """ 78 Stop the thread. 79 """ 80 self.__run = False
81
82 - def __open(self):
83 """ 84 Open the AMQP receiver. 85 @return: The opened receiver. 86 @rtype: Receiver 87 """ 88 addr = self.__consumer.address() 89 ssn = self.__consumer.session() 90 log.debug('open %s', addr) 91 return ssn.receiver(addr)
92
93 - def __fetch(self, receiver):
94 """ 95 Fetch the next available message. 96 @param receiver: An AMQP receiver. 97 @type receiver: Receiver 98 """ 99 try: 100 return receiver.fetch(timeout=self.WAIT) 101 except Empty: 102 # normal 103 pass 104 except: 105 sleep(self.WAIT) 106 raise
107
108 109 -class Consumer(Endpoint):
110 """ 111 An AMQP (abstract) consumer. 112 The received() method needs to be overridden. 113 @ivar __started: Indicates that start() has been called. 114 @type __started: bool 115 @ivar __destination: The AMQP destination to consume. 116 @type __destination: L{Destination} 117 @ivar __thread: The receiver thread. 118 @type __thread: L{ReceiverThread} 119 """ 120 121 @classmethod
122 - def subject(cls, message):
123 """ 124 Extract the message subject. 125 @param message: The received message. 126 @type message: qpid.messaging.Message 127 @return: The message subject 128 @rtype: str 129 """ 130 return message.properties.get('qpid.subject')
131
132 - def __init__(self, destination, **options):
133 """ 134 @param destination: The destination to consumer. 135 @type destination: L{Destination} 136 """ 137 Endpoint.__init__(self, **options) 138 self.__started = False 139 self.__destination = destination 140 self.__thread = ReceiverThread(self)
141
142 - def id(self):
143 """ 144 Get the endpoint id 145 @return: The destination (simple) address. 146 @rtype: str 147 """ 148 self._lock() 149 try: 150 return repr(self.__destination) 151 finally: 152 self._unlock()
153
154 - def address(self):
155 """ 156 Get the AMQP address for this endpoint. 157 @return: The AMQP address. 158 @rtype: str 159 """ 160 self._lock() 161 try: 162 return str(self.__destination) 163 finally: 164 self._unlock()
165
166 - def start(self):
167 """ 168 Start processing messages on the queue. 169 """ 170 self._lock() 171 try: 172 if self.__started: 173 return 174 self.__thread.start() 175 self.__started = True 176 finally: 177 self._unlock()
178
179 - def stop(self):
180 """ 181 Stop processing requests. 182 """ 183 self._lock() 184 try: 185 if not self.__started: 186 return 187 self.__thread.stop() 188 self.__thread.join(90) 189 self.__started = False 190 finally: 191 self._unlock()
192
193 - def close(self):
194 """ 195 Close the consumer. 196 Stop the receiver thread. 197 """ 198 self._lock() 199 try: 200 self.stop() 201 finally: 202 self._unlock() 203 Endpoint.close(self)
204
205 - def join(self):
206 """ 207 Join the worker thread. 208 """ 209 self.__thread.join()
210
211 - def received(self, message):
212 """ 213 Process received request. 214 Inject subject & destination.uuid. 215 @param message: The received message. 216 @type message: qpid.messaging.Message 217 """ 218 self._lock() 219 try: 220 self.__received(message) 221 finally: 222 self._unlock()
223
224 - def valid(self, envelope):
225 """ 226 Check to see if the envelope is valid. 227 @param envelope: The received envelope. 228 @type envelope: qpid.messaging.Message 229 """ 230 valid = True 231 if envelope.version != version: 232 valid = False 233 log.warn('{%s} version mismatch (discarded):\n%s', 234 self.id(), envelope) 235 return valid
236
237 - def dispatch(self, envelope):
238 """ 239 Dispatch received request. 240 @param envelope: The received envelope. 241 @type envelope: qpid.messaging.Message 242 """ 243 pass
244
245 - def __received(self, message):
246 """ 247 Process received request. 248 Inject subject & destination.uuid. 249 @param message: The received message. 250 @type message: qpid.messaging.Message 251 """ 252 envelope = Envelope() 253 envelope.load(message.content) 254 envelope.subject = self.subject(message) 255 envelope.ttl = message.ttl 256 log.debug('{%s} received:\n%s', self.id(), envelope) 257 if self.valid(envelope): 258 self.dispatch(envelope) 259 self.ack()
260
261 262 -class Reader(Endpoint):
263 """ 264 An AMQP message reader. 265 @ivar __opened: Indicates that open() has been called. 266 @type __opened: bool 267 @ivar __receiver: An AMQP receiver to read. 268 @type __receiver: Receiver 269 @ivar __destination: The AMQP destination to read. 270 @type __destination: L{Destination} 271 """ 272
273 - def __init__(self, destination, **options):
274 """ 275 @param destination: The destination to consumer. 276 @type destination: L{Destination} 277 @param options: Options passed to Endpoint. 278 @type options: dict 279 """ 280 self.__opened = False 281 self.__receiver = None 282 self.__destination = destination 283 Endpoint.__init__(self, **options)
284
285 - def open(self):
286 """ 287 Open the reader. 288 """ 289 Endpoint.open(self) 290 self._lock() 291 try: 292 if self.__opened: 293 return 294 ssn = self.session() 295 addr = self.address() 296 log.debug('{%s} open %s', self.id(), addr) 297 self.__receiver = ssn.receiver(addr) 298 self.__opened = True 299 finally: 300 self._unlock()
301
302 - def close(self):
303 """ 304 Close the reader. 305 """ 306 self._lock() 307 try: 308 if not self.__opened: 309 return 310 self.__receiver.close() 311 self.__opened = False 312 finally: 313 self._unlock() 314 Endpoint.close(self)
315
316 - def next(self, timeout=90):
317 """ 318 Get the next envelope from the queue. 319 @param timeout: The read timeout. 320 @type timeout: int 321 @return: The next envelope. 322 @rtype: L{Envelope} 323 """ 324 msg = self.__fetch(timeout) 325 if msg: 326 envelope = Envelope() 327 envelope.load(msg.content) 328 envelope.subject = Consumer.subject(msg) 329 log.debug('{%s} read next:\n%s', self.id(), envelope) 330 return envelope
331
332 - def read(self, timeout=90):
333 """ 334 Get the next message from the queue. 335 @param timeout: The read timeout. 336 @type timeout: int 337 @return: The next message. 338 @rtype: Message 339 """ 340 return self.__fetch(timeout)
341
342 - def search(self, sn, timeout=90):
343 """ 344 Seach the reply queue for the envelope with 345 the matching serial #. 346 @param sn: The expected serial number. 347 @type sn: str 348 @param timeout: The read timeout. 349 @type timeout: int 350 @return: The next envelope. 351 @rtype: L{Envelope} 352 """ 353 log.debug('{%s} searching for: sn=%s', self.id(), sn) 354 while True: 355 envelope = self.next(timeout) 356 if not envelope: 357 return 358 if sn == envelope.sn: 359 log.debug('{%s} search found:\n%s', self.id(), envelope) 360 return envelope 361 else: 362 log.debug('{%s} search discarding:\n%s', self.id(), envelope) 363 self.ack()
364
365 - def address(self):
366 """ 367 Get the AMQP address for this endpoint. 368 @return: The AMQP address. 369 @rtype: str 370 """ 371 self._lock() 372 try: 373 return str(self.__destination) 374 finally: 375 self._unlock()
376
377 - def __fetch(self, timeout):
378 """ 379 Fetch the next message. 380 @param timeout: The read timeout. 381 @type timeout: int 382 @return: The next message, or (None). 383 @rtype: Message 384 """ 385 try: 386 self.open() 387 return self.__receiver.fetch(timeout=timeout) 388 except Empty: 389 # normal 390 pass 391 except: 392 log.error(self.id(), exc_info=1)
393