Package gofer :: Package messaging
[hide private]
[frames] | no frames]

Source Code for Package gofer.messaging

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16  import string 
 17  from uuid import uuid4 
 18  import simplejson as json 
 19   
 20  version = '0.4' 
 21   
 22   
23 -def getuuid():
24 return str(uuid4())
25
26 -def squash(s):
27 sq = [] 28 for c in s: 29 if c in string.whitespace: 30 continue 31 sq.append(c) 32 return ''.join(sq)
33 34
35 -class Options(object):
36 """ 37 Provides a dict-like object that also provides 38 (.) dot notation accessors. 39 """ 40
41 - def __init__(self, *objects, **keywords):
42 for obj in objects: 43 if isinstance(obj, dict): 44 self.__dict__.update(obj) 45 continue 46 if isinstance(obj, Options): 47 self.__dict__.update(obj.__dict__) 48 continue 49 raise ValueError(obj) 50 self.__dict__.update(keywords)
51
52 - def __getattr__(self, name):
53 return self.__dict__.get(name)
54
55 - def __getitem__(self, name):
56 return self.__dict__[name]
57
58 - def __setitem__(self, name, value):
59 self.__dict__[name] = value
60
61 - def __iadd__(self, obj):
62 if isinstance(obj, dict): 63 self.__dict__.update(obj) 64 return self 65 if isinstance(obj, object): 66 self.__dict__.update(object.__dict__) 67 return self 68 raise ValueError(obj)
69
70 - def __len__(self):
71 return len(self.__dict__)
72
73 - def __iter__(self):
74 return iter(self.__dict__)
75
76 - def __repr__(self):
77 return repr(self.__dict__)
78
79 - def __str__(self):
80 return str(self.__dict__)
81 82
83 -class Envelope(Options):
84 """ 85 Extends the dict-like object that also provides 86 JSON serialization. 87 """ 88
89 - def load(self, s):
90 """ 91 Load using a json string. 92 @param s: A json encoded string. 93 @type s: str 94 """ 95 d = json.loads(s) 96 self.__dict__.update(d) 97 return self
98
99 - def dump(self):
100 """ 101 Dump to a json string. 102 @return: A json encoded string. 103 @rtype: str 104 """ 105 def fn(obj): 106 if isinstance(obj, Options): 107 obj = dict(obj.__dict__) 108 for k,v in obj.items(): 109 obj[k] = fn(v) 110 return obj 111 if isinstance(obj, dict): 112 obj = dict(obj) 113 for k,v in obj.items(): 114 obj[k] = fn(v) 115 return obj 116 if isinstance(obj, (tuple, list)): 117 obj = [fn(e) for e in obj] 118 return obj 119 return obj
120 d = fn(self) 121 return json.dumps(d, indent=2)
122 123
124 -class Destination:
125 """ 126 AMQP destinations (topics & queues) 127 """ 128
129 - def address(self):
130 """ 131 Get the destination I{formal} AMQP address which contains 132 properties used to create the destination. 133 @return: The destination address. 134 @rtype: str 135 """ 136 pass
137
138 - def delete(self, session):
139 """ 140 Delete the destination. 141 Implemented using a hack becauase python API does not 142 directly support removing destinations. 143 @param session: An AMQP session. 144 @type session: I{qpid.messaging.Session} 145 """ 146 address = '%s;{delete:always}' % repr(self) 147 sender = session.sender(address) 148 sender.close()
149
150 - def __repr__(self):
151 return str(self).split(';', 1)[0]
152 153
154 -class XBinding:
155 """ 156 Represents an AMQP X-BINDING fragment. 157 @ivar exchange: An exchange name. 158 @type exchange: str 159 @ivar key: An (optional) exchange routing key. 160 @type key: str 161 """ 162
163 - def __init__(self, exchange, key=None):
164 """ 165 @param exchange: An exchange name. 166 @type exchange: str 167 @param key: An (optional) routing key. 168 @type key: str 169 """ 170 self.exchange = exchange 171 self.key = key
172
173 - def __str__(self):
174 if self.key: 175 return "{exchange:%s,key:'%s'}" % (self.exchange, self.key) 176 else: 177 return "{exchange:%s}" % self.exchange
178 179
180 -class XBindings:
181 """ 182 Represents an AMQP X-BINDINGS fragment. 183 @ivar bindings: A list of binding object. 184 @type bindings: list: L{XBinding} 185 """ 186
187 - def __init__(self, bindings=[]):
188 """ 189 @param bindings: A list of binding objects. 190 @type bindings: list: L{Binding} 191 """ 192 self.bindings = bindings
193
194 - def __bindings(self):
195 bindings = [] 196 i = 0 197 for b in self.bindings: 198 if i > 0: 199 bindings.append(',') 200 bindings.append(str(b)) 201 i += 1 202 return ''.join(bindings)
203
204 - def __str__(self):
205 if self.bindings: 206 return 'x-bindings:[%s]' % self.__bindings() 207 else: 208 return ''
209 210
211 -class Topic(Destination):
212 """ 213 Represents and AMQP topic. 214 @ivar topic: The name of the topic. 215 @type topic: str 216 @ivar subject: The subject. 217 @type subject: str 218 @ivar name: The (optional) subscription name. 219 Used for durable subscriptions. 220 @type name: str 221 """ 222
223 - def __init__(self, topic, subject=None):
224 """ 225 @param topic: The name of the topic. 226 @type topic: str 227 @param subject: The subject. 228 @type subject: str 229 """ 230 self.topic = topic 231 self.subject = subject
232
233 - def address(self):
234 """ 235 Get the topic I{formal} AMQP address which contains 236 properties used to create the topic. 237 @return: The topic address. 238 @rtype: str 239 """ 240 fmt = squash(""" 241 %s;{ 242 create:always, 243 node:{type:topic}, 244 link:{ 245 x-declare:{ 246 auto-delete:True, 247 arguments:{no-local:True} 248 } 249 } 250 } 251 """) 252 topic = self.topic 253 if self.subject: 254 topic = '/'.join((topic, self.subject)) 255 return fmt % topic
256
257 - def binding(self):
258 """ 259 Get an binding for the queue. 260 @return: A binding. 261 @rtype: L{XBinding} 262 """ 263 return XBinding(self.topic, self.subject)
264
265 - def __str__(self):
266 return self.address()
267 268
269 -class Queue(Destination):
270 """ 271 Represents and AMQP queue. 272 @ivar name: The name of the queue. 273 @type name: str 274 @ivar durable: The durable flag. 275 @type durable: str 276 """ 277
278 - def __init__(self, name, durable=True, bindings=[]):
279 """ 280 @param name: The name of the queue. 281 @type name: str 282 @param durable: The durable flag. 283 @type durable: str 284 @param bindings: An optional list of bindings used to 285 bind queues to other exchanges. 286 @type bindings: L{Destination} 287 """ 288 self.name = name 289 self.durable = durable 290 self.bindings = XBindings(bindings)
291
292 - def address(self):
293 """ 294 Get the queue I{formal} AMQP address which contains 295 properties used to create the queue. 296 @return: The queue address. 297 @rtype: str 298 """ 299 fmt = squash(""" 300 %s;{ 301 create:always, 302 node:{ 303 type:queue, 304 durable:True, 305 %s 306 }, 307 link:{ 308 durable:True, 309 reliability:at-least-once, 310 x-subscribe:{exclusive:True} 311 } 312 } 313 """) 314 return fmt % (self.name, self.bindings)
315
316 - def tmpAddress(self):
317 """ 318 Get the queue AMQP address which contains 319 properties used to create a temporary queue. 320 @return: The queue address. 321 @rtype: str 322 """ 323 fmt = squash(""" 324 %s;{ 325 create:always, 326 delete:receiver, 327 node:{ 328 type:queue, 329 %s 330 }, 331 link:{ 332 durable:True, 333 reliability:at-least-once, 334 x-subscribe:{exclusive:True} 335 } 336 } 337 """) 338 return fmt % (self.name, self.bindings)
339
340 - def xbinding(self):
341 """ 342 Get an xbinding for the queue. 343 @return: An xbinding. 344 @rtype: L{XBinding} 345 """ 346 return XBinding(self.name)
347
348 - def __str__(self):
349 if self.durable: 350 return self.address() 351 else: 352 return self.tmpAddress()
353