1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import string
17 from uuid import uuid4
18 import simplejson as json
19
20 version = '0.4'
21
22
25
27 sq = []
28 for c in s:
29 if c in string.whitespace:
30 continue
31 sq.append(c)
32 return ''.join(sq)
33
34
36 """
37 Provides a dict-like object that also provides
38 (.) dot notation accessors.
39 """
40
41 - def __init__(self, *objects, **keywords):
42 for obj in objects:
43 if isinstance(obj, dict):
44 self.__dict__.update(obj)
45 continue
46 if isinstance(obj, Options):
47 self.__dict__.update(obj.__dict__)
48 continue
49 raise ValueError(obj)
50 self.__dict__.update(keywords)
51
54
56 return self.__dict__[name]
57
59 self.__dict__[name] = value
60
62 if isinstance(obj, dict):
63 self.__dict__.update(obj)
64 return self
65 if isinstance(obj, object):
66 self.__dict__.update(object.__dict__)
67 return self
68 raise ValueError(obj)
69
71 return len(self.__dict__)
72
74 return iter(self.__dict__)
75
77 return repr(self.__dict__)
78
80 return str(self.__dict__)
81
82
84 """
85 Extends the dict-like object that also provides
86 JSON serialization.
87 """
88
90 """
91 Load using a json string.
92 @param s: A json encoded string.
93 @type s: str
94 """
95 d = json.loads(s)
96 self.__dict__.update(d)
97 return self
98
100 """
101 Dump to a json string.
102 @return: A json encoded string.
103 @rtype: str
104 """
105 def fn(obj):
106 if isinstance(obj, Options):
107 obj = dict(obj.__dict__)
108 for k,v in obj.items():
109 obj[k] = fn(v)
110 return obj
111 if isinstance(obj, dict):
112 obj = dict(obj)
113 for k,v in obj.items():
114 obj[k] = fn(v)
115 return obj
116 if isinstance(obj, (tuple, list)):
117 obj = [fn(e) for e in obj]
118 return obj
119 return obj
120 d = fn(self)
121 return json.dumps(d, indent=2)
122
123
125 """
126 AMQP destinations (topics & queues)
127 """
128
130 """
131 Get the destination I{formal} AMQP address which contains
132 properties used to create the destination.
133 @return: The destination address.
134 @rtype: str
135 """
136 pass
137
139 """
140 Delete the destination.
141 Implemented using a hack becauase python API does not
142 directly support removing destinations.
143 @param session: An AMQP session.
144 @type session: I{qpid.messaging.Session}
145 """
146 address = '%s;{delete:always}' % repr(self)
147 sender = session.sender(address)
148 sender.close()
149
151 return str(self).split(';', 1)[0]
152
153
155 """
156 Represents an AMQP X-BINDING fragment.
157 @ivar exchange: An exchange name.
158 @type exchange: str
159 @ivar key: An (optional) exchange routing key.
160 @type key: str
161 """
162
163 - def __init__(self, exchange, key=None):
164 """
165 @param exchange: An exchange name.
166 @type exchange: str
167 @param key: An (optional) routing key.
168 @type key: str
169 """
170 self.exchange = exchange
171 self.key = key
172
174 if self.key:
175 return "{exchange:%s,key:'%s'}" % (self.exchange, self.key)
176 else:
177 return "{exchange:%s}" % self.exchange
178
179
181 """
182 Represents an AMQP X-BINDINGS fragment.
183 @ivar bindings: A list of binding object.
184 @type bindings: list: L{XBinding}
185 """
186
188 """
189 @param bindings: A list of binding objects.
190 @type bindings: list: L{Binding}
191 """
192 self.bindings = bindings
193
195 bindings = []
196 i = 0
197 for b in self.bindings:
198 if i > 0:
199 bindings.append(',')
200 bindings.append(str(b))
201 i += 1
202 return ''.join(bindings)
203
205 if self.bindings:
206 return 'x-bindings:[%s]' % self.__bindings()
207 else:
208 return ''
209
210
211 -class Topic(Destination):
212 """
213 Represents and AMQP topic.
214 @ivar topic: The name of the topic.
215 @type topic: str
216 @ivar subject: The subject.
217 @type subject: str
218 @ivar name: The (optional) subscription name.
219 Used for durable subscriptions.
220 @type name: str
221 """
222
223 - def __init__(self, topic, subject=None):
224 """
225 @param topic: The name of the topic.
226 @type topic: str
227 @param subject: The subject.
228 @type subject: str
229 """
230 self.topic = topic
231 self.subject = subject
232
234 """
235 Get the topic I{formal} AMQP address which contains
236 properties used to create the topic.
237 @return: The topic address.
238 @rtype: str
239 """
240 fmt = squash("""
241 %s;{
242 create:always,
243 node:{type:topic},
244 link:{
245 x-declare:{
246 auto-delete:True,
247 arguments:{no-local:True}
248 }
249 }
250 }
251 """)
252 topic = self.topic
253 if self.subject:
254 topic = '/'.join((topic, self.subject))
255 return fmt % topic
256
258 """
259 Get an binding for the queue.
260 @return: A binding.
261 @rtype: L{XBinding}
262 """
263 return XBinding(self.topic, self.subject)
264
267
268
269 -class Queue(Destination):
270 """
271 Represents and AMQP queue.
272 @ivar name: The name of the queue.
273 @type name: str
274 @ivar durable: The durable flag.
275 @type durable: str
276 """
277
278 - def __init__(self, name, durable=True, bindings=[]):
279 """
280 @param name: The name of the queue.
281 @type name: str
282 @param durable: The durable flag.
283 @type durable: str
284 @param bindings: An optional list of bindings used to
285 bind queues to other exchanges.
286 @type bindings: L{Destination}
287 """
288 self.name = name
289 self.durable = durable
290 self.bindings = XBindings(bindings)
291
293 """
294 Get the queue I{formal} AMQP address which contains
295 properties used to create the queue.
296 @return: The queue address.
297 @rtype: str
298 """
299 fmt = squash("""
300 %s;{
301 create:always,
302 node:{
303 type:queue,
304 durable:True,
305 %s
306 },
307 link:{
308 durable:True,
309 reliability:at-least-once,
310 x-subscribe:{exclusive:True}
311 }
312 }
313 """)
314 return fmt % (self.name, self.bindings)
315
317 """
318 Get the queue AMQP address which contains
319 properties used to create a temporary queue.
320 @return: The queue address.
321 @rtype: str
322 """
323 fmt = squash("""
324 %s;{
325 create:always,
326 delete:receiver,
327 node:{
328 type:queue,
329 %s
330 },
331 link:{
332 durable:True,
333 reliability:at-least-once,
334 x-subscribe:{exclusive:True}
335 }
336 }
337 """)
338 return fmt % (self.name, self.bindings)
339
341 """
342 Get an xbinding for the queue.
343 @return: An xbinding.
344 @rtype: L{XBinding}
345 """
346 return XBinding(self.name)
347
349 if self.durable:
350 return self.address()
351 else:
352 return self.tmpAddress()
353