1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 Defined AMQP broker objects.
19 """
20
21 from gofer import Singleton
22 from gofer.messaging import *
23 from qpid.messaging import Connection
24 from threading import RLock
25 from logging import getLogger
26
27 log = getLogger(__name__)
42
44 """
45 Represents an AMQP broker.
46 @cvar domain: A list dict of brokers.
47 @type domain: dict
48 @ivar url: The broker's url.
49 @type url: L{URL}
50 @ivar cacert: Path to a PEM encoded file containing
51 the CA certificate used to validate the server certificate.
52 @type cacert: str
53 @ivar clientcert: Path to a PEM encoded file containing
54 the private key & certificate used for client authentication.
55 @type clientcert: str
56 """
57 __metaclass__ = MetaBroker
58 __mutex = RLock()
59
60 @classmethod
63
64 @classmethod
67
69 """
70 @param url: The broker url <transport>://<host>:<port>.
71 @type url: str
72 """
73 if isinstance(url, URL):
74 self.url = url
75 else:
76 self.url = URL(url)
77 self.cacert = None
78 self.clientcert = None
79 self.connection = None
80
82 """
83 Get broker identifier.
84 @return: The broker I{simple} url.
85 @rtype: str
86 """
87 return self.url.simple()
88
90 """
91 Connect to the broker.
92 @return: The AMQP connection object.
93 @rtype: I{Connection}
94 """
95 self.__lock()
96 try:
97 if self.connection is None:
98 url = self.url.simple()
99 transport = self.url.transport
100 log.info('connecting:\n%s', self)
101 con = Connection(
102 url=url,
103 tcp_nodelay=True,
104 reconnect=True,
105 transport=transport)
106 con.attach()
107 log.info('{%s} connected to AMQP', self.id())
108 self.connection = con
109 else:
110 con = self.connection
111 return con
112 finally:
113 self.__unlock()
114
115 - def touch(self, address):
116 """
117 Touch (eval) the specified AMQP address string.
118 Used to perform destination administration.
119 Examples:
120 - myqueue;{delete:always}
121 - mytopic;{create:always,node:node:{type:topic}}
122 @param address: An AMQP address.
123 @type address: str
124 """
125 connection = self.connect()
126 session = connection.session()
127 sender = session.sender(address)
128 sender.close()
129
131 """
132 Close the connection to the broker.
133 """
134 self.__lock()
135 try:
136 try:
137 con = self.connection
138 self.connection = None
139 con.close()
140 except:
141 log.exception(str(self))
142 finally:
143 self.__unlock()
144
146 s = []
147 s.append('{%s}:' % self.id())
148 s.append('transport=%s' % self.url.transport.upper())
149 s.append('host=%s' % self.url.host)
150 s.append('port=%d' % self.url.port)
151 s.append('cacert=%s' % self.cacert)
152 s.append('clientcert=%s' % self.clientcert)
153 return '\n'.join(s)
154
157 """
158 Represents a QPID broker URL.
159 @ivar transport: A qpid transport.
160 @type transport: str
161 @ivar host: The host.
162 @type host: str
163 @ivar port: The tcp port.
164 @type port: int
165 """
166
167 @classmethod
169 """
170 Split the url string.
171 @param s: A url string format: <transport>://<host>:<port>.
172 @type s: str
173 @return: The url parts: (transport, host, port)
174 @rtype: tuple
175 """
176 transport, hp = cls.spliturl(s)
177 host, port = cls.splitport(hp)
178 return (transport, host, port)
179
180 @classmethod
182 """
183 Split the transport and url parts.
184 @param s: A url string format: <transport>://<host>:<port>.
185 @type s: str
186 @return: The urlparts: (transport, hostport)
187 @rtype: tuple
188 """
189 part = s.split('://', 1)
190 if len(part) > 1:
191 transport, hp = (part[0], part[1])
192 else:
193 transport, hp = ('tcp', part[0])
194 return (transport, hp)
195
196 @classmethod
198 """
199 Split the host and port.
200 @param s: A url string format: <host>:<port>.
201 @type s: str
202 @return: The urlparts: (host, port)
203 @rtype: tuple
204 """
205 part = s.split(':')
206 host = part[0]
207 if len(part) < 2:
208 port = d
209 else:
210 port = part[1]
211 return (host, int(port))
212
214 """
215 Get the I{simple} string representation: <host>:<port>
216 @return: "<host>:<port>"
217 @rtype: str
218 """
219 return '%s:%d' % (self.host, self.port)
220
222 """
223 @param s: A url string format: <transport>://<host>:<port>.
224 @type s: str
225 """
226 self.transport,\
227 self.host,\
228 self.port = self.split(s)
229
231 return hash(self.simple())
232
235
237 return '%s://%s:%d' % \
238 (self.transport,
239 self.host,
240 self.port)
241