1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 AMQP endpoint base classes.
18 """
19
20 import atexit
21 from time import sleep
22 from threading import RLock
23 from gofer.messaging import *
24 from gofer.messaging.broker import Broker
25 from gofer.messaging.transport import SSL
26 from qpid.messaging import Connection
27 from logging import getLogger
28
29 log = getLogger(__name__)
30
31
33 """
34 The AMQP session pool.
35 """
36
38 self.__mutex = RLock()
39 self.__pools = {}
40
42 """
43 Get the next free session in the pool.
44 @param url: A broker url.
45 @type url: str
46 @return: A free session.
47 @rtype: qpid.messaging.Session
48 """
49 self.__lock()
50 try:
51 pool = self.__pool(url)
52 ssn = self.__pop(pool)
53 if ssn is None:
54 broker = Broker(url)
55 con = broker.connect()
56 ssn = con.session()
57 pool[1].append(ssn)
58 return ssn
59 finally:
60 self.__unlock()
61
62 - def put(self, url, ssn):
63 """
64 Release a session back to the pool.
65 @param url: A broker url.
66 @type url: str
67 @param ssn: An AMQP session.
68 @rtype: qpid.messaging.Session
69 """
70 self.__lock()
71 try:
72 pool = self.__pool(url)
73 if ssn in pool[1]:
74 pool[1].remove(ssn)
75 if ssn not in pool[0]:
76 pool[0].append(ssn)
77 finally:
78 self.__unlock()
79
81 """
82 Purge (close) free sessions.
83 """
84 self.__lock()
85 try:
86 for pool in self.__pools.values():
87 while pool[0]:
88 try:
89 ssn = pool.pop()
90 ssn.close()
91 except:
92 log.error(ssn, exc_info=1)
93 finally:
94 self.__unlock()
95
97 """
98 Pop the next available session from the free list.
99 The session is acknowledge to purge it of stale transactions.
100 @param pool: A pool (free,busy).
101 @type pool: tuple
102 @return: The popped session
103 @rtype: qpid.messaging.Session
104 """
105 while pool[0]:
106 ssn = pool[0].pop()
107 try:
108 ssn.acknowledge()
109 return ssn
110 except:
111 log.error(ssn, exc_info=1)
112
114 """
115 Obtain the pool for the specified url.
116 @param url: A broker url.
117 @type url: str
118 @return: The session pool. (free,busy)
119 @rtype: tuple
120 """
121 self.__lock()
122 try:
123 pool = self.__pools.get(url)
124 if pool is None:
125 pool = ([],[])
126 self.__pools[url] = pool
127 return pool
128 finally:
129 self.__unlock()
130
133
136
137
139 """
140 Base class for QPID endpoint.
141 @cvar ssnpool: An AMQP session pool.
142 @type ssnpool: L{SessionPool}
143 @ivar uuid: The unique endpoint id.
144 @type uuid: str
145 @ivar url: The broker URL.
146 @type url: str
147 @ivar __mutex: The endpoint mutex.
148 @type __mutex: RLock
149 @ivar __session: An AMQP session.
150 @type __session: qpid.messaging.Session
151 """
152
153 LOCALHOST = 'tcp://localhost:5672'
154
155 ssnpool = SessionPool()
156
157 - def __init__(self, uuid=None, url=None):
158 """
159 @param uuid: The endpoint uuid.
160 @type uuid: str
161 @param url: The broker url <transport>://<user>/<pass>@<host>:<port>.
162 @type url: str
163 """
164 self.uuid = (uuid or getuuid())
165 self.url = (url or self.LOCALHOST)
166 self.__mutex = RLock()
167 self.__session = None
168 atexit.register(self.close)
169
171 """
172 Get the endpoint id
173 @return: The id.
174 @rtype: str
175 """
176 return self.uuid
177
179 """
180 Get a session for the open connection.
181 @return: An open session.
182 @rtype: qpid.messaging.Session
183 """
184 self._lock()
185 try:
186 if self.__session is None:
187 self.__session = self.ssnpool.get(self.url)
188 log.debug('{%s} connected to AMQP' % self.id())
189 return self.__session
190 finally:
191 self._unlock()
192
194 """
195 Acknowledge all messages received on the session.
196 """
197 try:
198 self.__session.acknowledge(sync=False)
199 except:
200 pass
201
203 """
204 Open and configure the endpoint.
205 """
206 pass
207
209 """
210 Close (shutdown) the endpoint.
211 """
212 self._lock()
213 try:
214 if self.__session is None:
215 return
216 self.ssnpool.put(self.url, self.__session)
217 self.__session = None
218 finally:
219 self._unlock()
220
223
226
228 urlpart = self.url.split('://', 1)
229 if len(urlpart) == 1:
230 return (urlpart[0], 'tcp')
231 else:
232 return (urlpart[0], urlpart[1])
233
235 try:
236 self.close()
237 except:
238 log.error(self.uuid, exc_info=1)
239
241 return 'Endpoint id:%s broker @ %s' % (self.id(), self.url)
242