1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Contains AMQP message producer classes.
18 """
19
20 from gofer.messaging import *
21 from gofer.messaging.endpoint import Endpoint
22 from qpid.messaging import Message
23 from logging import getLogger
24
25 log = getLogger(__name__)
26
27
29 """
30 An AMQP (abstract) message producer.
31 """
32
33 - def send(self, destination, ttl=None, **body):
34 """
35 Send a message.
36 @param destination: An AMQP destination.
37 @type destination: L{Destination}
38 @param ttl: Time to Live (seconds)
39 @type ttl: float
40 @keyword body: envelope body.
41 @return: The message serial number.
42 @rtype: str
43 """
44 sn = getuuid()
45 address = str(destination)
46 routing = (self.id(), address.split(';')[0])
47 envelope = Envelope(sn=sn, version=version, routing=routing)
48 envelope += body
49 json = envelope.dump()
50 message = Message(content=json, durable=True, ttl=ttl)
51 sender = self.session().sender(address)
52 sender.send(message)
53 sender.close()
54 log.debug('{%s} sent (%s)\n%s', self.id(), address, envelope)
55 return sn
56
58 """
59 Broadcast a message to (N) queues.
60 @param destinations: A list of AMQP destinations.
61 @type destinations: [L{Destination},..]
62 @keyword body: envelope body.
63 @return: A list of (addr,sn).
64 @rtype: list
65 """
66 sns = []
67 for dst in destinations:
68 sn = self.send(str(dst), **body)
69 sns.append((repr(dst),sn))
70 return sns
71
72
74 """
75 An binary AMQP message producer.
76 """
77
78 - def send(self, destination, content, ttl=None):
79 """
80 Send a message.
81 @param destination: An AMQP destination.
82 @type destination: L{Destination}
83 @param content: The message content
84 @type content: buf
85 @param ttl: Time to Live (seconds)
86 @type ttl: float
87 """
88 address = str(destination)
89 message = Message(content=content, durable=True, ttl=ttl)
90 sender = self.session().sender(address)
91 sender.send(message)
92 sender.close()
93 log.debug('{%s} sent (%s)\n%s', self.id(), address)
94
95 - def broadcast(self, destinations, content, ttl=None):
96 """
97 Broadcast a message to (N) queues.
98 @param destinations: A list of AMQP destinations.
99 @type destinations: [L{Destination},..]
100 @param content: The message content
101 @type content: buf
102 """
103 for dst in destinations:
104 sn = self.send(str(dst), content, ttl)
105