Package gofer :: Package messaging :: Module broker
[hide private]
[frames] | no frames]

Source Code for Module gofer.messaging.broker

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16   
 17  """ 
 18  Defined AMQP broker objects. 
 19  """ 
 20   
 21  from gofer import Singleton 
 22  from gofer.messaging import * 
 23  from qpid.messaging import Connection 
 24  from threading import RLock 
 25  from logging import getLogger 
 26   
 27  log = getLogger(__name__) 
28 29 30 -class MetaBroker(Singleton):
31 """ 32 Broker MetaClass. 33 Singleton by simple url. 34 """ 35 36 @classmethod
37 - def key(cls, t, d):
38 url = t[0] 39 if not isinstance(url, URL): 40 url = URL(url) 41 return url.simple()
42
43 -class Broker:
44 """ 45 Represents an AMQP broker. 46 @cvar domain: A list dict of brokers. 47 @type domain: dict 48 @ivar url: The broker's url. 49 @type url: L{URL} 50 @ivar cacert: Path to a PEM encoded file containing 51 the CA certificate used to validate the server certificate. 52 @type cacert: str 53 @ivar clientcert: Path to a PEM encoded file containing 54 the private key & certificate used for client authentication. 55 @type clientcert: str 56 """ 57 __metaclass__ = MetaBroker 58 __mutex = RLock() 59 60 @classmethod
61 - def __lock(cls):
62 cls.__mutex.acquire()
63 64 @classmethod
65 - def __unlock(cls):
66 cls.__mutex.release()
67
68 - def __init__(self, url):
69 """ 70 @param url: The broker url <transport>://<host>:<port>. 71 @type url: str 72 """ 73 if isinstance(url, URL): 74 self.url = url 75 else: 76 self.url = URL(url) 77 self.cacert = None 78 self.clientcert = None 79 self.connection = None
80
81 - def id(self):
82 """ 83 Get broker identifier. 84 @return: The broker I{simple} url. 85 @rtype: str 86 """ 87 return self.url.simple()
88
89 - def connect(self):
90 """ 91 Connect to the broker. 92 @return: The AMQP connection object. 93 @rtype: I{Connection} 94 """ 95 self.__lock() 96 try: 97 if self.connection is None: 98 url = self.url.simple() 99 transport = self.url.transport 100 log.info('connecting:\n%s', self) 101 con = Connection( 102 url=url, 103 tcp_nodelay=True, 104 reconnect=True, 105 transport=transport) 106 con.attach() 107 log.info('{%s} connected to AMQP', self.id()) 108 self.connection = con 109 else: 110 con = self.connection 111 return con 112 finally: 113 self.__unlock()
114
115 - def touch(self, address):
116 """ 117 Touch (eval) the specified AMQP address string. 118 Used to perform destination administration. 119 Examples: 120 - myqueue;{delete:always} 121 - mytopic;{create:always,node:node:{type:topic}} 122 @param address: An AMQP address. 123 @type address: str 124 """ 125 connection = self.connect() 126 session = connection.session() 127 sender = session.sender(address) 128 sender.close()
129
130 - def close(self):
131 """ 132 Close the connection to the broker. 133 """ 134 self.__lock() 135 try: 136 try: 137 con = self.connection 138 self.connection = None 139 con.close() 140 except: 141 log.exception(str(self)) 142 finally: 143 self.__unlock()
144
145 - def __str__(self):
146 s = [] 147 s.append('{%s}:' % self.id()) 148 s.append('transport=%s' % self.url.transport.upper()) 149 s.append('host=%s' % self.url.host) 150 s.append('port=%d' % self.url.port) 151 s.append('cacert=%s' % self.cacert) 152 s.append('clientcert=%s' % self.clientcert) 153 return '\n'.join(s)
154
155 156 -class URL:
157 """ 158 Represents a QPID broker URL. 159 @ivar transport: A qpid transport. 160 @type transport: str 161 @ivar host: The host. 162 @type host: str 163 @ivar port: The tcp port. 164 @type port: int 165 """ 166 167 @classmethod
168 - def split(cls, s):
169 """ 170 Split the url string. 171 @param s: A url string format: <transport>://<host>:<port>. 172 @type s: str 173 @return: The url parts: (transport, host, port) 174 @rtype: tuple 175 """ 176 transport, hp = cls.spliturl(s) 177 host, port = cls.splitport(hp) 178 return (transport, host, port)
179 180 @classmethod
181 - def spliturl(cls, s):
182 """ 183 Split the transport and url parts. 184 @param s: A url string format: <transport>://<host>:<port>. 185 @type s: str 186 @return: The urlparts: (transport, hostport) 187 @rtype: tuple 188 """ 189 part = s.split('://', 1) 190 if len(part) > 1: 191 transport, hp = (part[0], part[1]) 192 else: 193 transport, hp = ('tcp', part[0]) 194 return (transport, hp)
195 196 @classmethod
197 - def splitport(cls, s, d=5672):
198 """ 199 Split the host and port. 200 @param s: A url string format: <host>:<port>. 201 @type s: str 202 @return: The urlparts: (host, port) 203 @rtype: tuple 204 """ 205 part = s.split(':') 206 host = part[0] 207 if len(part) < 2: 208 port = d 209 else: 210 port = part[1] 211 return (host, int(port))
212
213 - def simple(self):
214 """ 215 Get the I{simple} string representation: <host>:<port> 216 @return: "<host>:<port>" 217 @rtype: str 218 """ 219 return '%s:%d' % (self.host, self.port)
220
221 - def __init__(self, s):
222 """ 223 @param s: A url string format: <transport>://<host>:<port>. 224 @type s: str 225 """ 226 self.transport,\ 227 self.host,\ 228 self.port = self.split(s)
229
230 - def __hash__(self):
231 return hash(self.simple())
232
233 - def __eq__(self, other):
234 return ( self.simple() == other.simple() )
235
236 - def __str__(self):
237 return '%s://%s:%d' % \ 238 (self.transport, 239 self.host, 240 self.port)
241