Package gofer :: Package messaging :: Module producer
[hide private]
[frames] | no frames]

Source Code for Module gofer.messaging.producer

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16  """ 
 17  Contains AMQP message producer classes. 
 18  """ 
 19   
 20  from gofer.messaging import * 
 21  from gofer.messaging.endpoint import Endpoint 
 22  from qpid.messaging import Message 
 23  from logging import getLogger 
 24   
 25  log = getLogger(__name__) 
 26   
 27   
28 -class Producer(Endpoint):
29 """ 30 An AMQP (abstract) message producer. 31 """ 32
33 - def send(self, destination, ttl=None, **body):
34 """ 35 Send a message. 36 @param destination: An AMQP destination. 37 @type destination: L{Destination} 38 @param ttl: Time to Live (seconds) 39 @type ttl: float 40 @keyword body: envelope body. 41 @return: The message serial number. 42 @rtype: str 43 """ 44 sn = getuuid() 45 address = str(destination) 46 routing = (self.id(), address.split(';')[0]) 47 envelope = Envelope(sn=sn, version=version, routing=routing) 48 envelope += body 49 json = envelope.dump() 50 message = Message(content=json, durable=True, ttl=ttl) 51 sender = self.session().sender(address) 52 sender.send(message) 53 sender.close() 54 log.debug('{%s} sent (%s)\n%s', self.id(), address, envelope) 55 return sn
56
57 - def broadcast(self, destinations, **body):
58 """ 59 Broadcast a message to (N) queues. 60 @param destinations: A list of AMQP destinations. 61 @type destinations: [L{Destination},..] 62 @keyword body: envelope body. 63 @return: A list of (addr,sn). 64 @rtype: list 65 """ 66 sns = [] 67 for dst in destinations: 68 sn = self.send(str(dst), **body) 69 sns.append((repr(dst),sn)) 70 return sns
71 72
73 -class BinaryProducer(Endpoint):
74 """ 75 An binary AMQP message producer. 76 """ 77
78 - def send(self, destination, content, ttl=None):
79 """ 80 Send a message. 81 @param destination: An AMQP destination. 82 @type destination: L{Destination} 83 @param content: The message content 84 @type content: buf 85 @param ttl: Time to Live (seconds) 86 @type ttl: float 87 """ 88 address = str(destination) 89 message = Message(content=content, durable=True, ttl=ttl) 90 sender = self.session().sender(address) 91 sender.send(message) 92 sender.close() 93 log.debug('{%s} sent (%s)\n%s', self.id(), address)
94
95 - def broadcast(self, destinations, content, ttl=None):
96 """ 97 Broadcast a message to (N) queues. 98 @param destinations: A list of AMQP destinations. 99 @type destinations: [L{Destination},..] 100 @param content: The message content 101 @type content: buf 102 """ 103 for dst in destinations: 104 sn = self.send(str(dst), content, ttl)
105