Package gofer :: Package messaging :: Module endpoint
[hide private]
[frames] | no frames]

Source Code for Module gofer.messaging.endpoint

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16  """ 
 17  AMQP endpoint base classes. 
 18  """ 
 19   
 20  import atexit 
 21  from time import sleep 
 22  from threading import RLock 
 23  from gofer.messaging import * 
 24  from gofer.messaging.broker import Broker 
 25  from gofer.messaging.transport import SSL 
 26  from qpid.messaging import Connection 
 27  from logging import getLogger 
 28   
 29  log = getLogger(__name__) 
 30   
 31   
32 -class SessionPool:
33 """ 34 The AMQP session pool. 35 """ 36
37 - def __init__(self):
38 self.__mutex = RLock() 39 self.__pools = {}
40
41 - def get(self, url):
42 """ 43 Get the next free session in the pool. 44 @param url: A broker url. 45 @type url: str 46 @return: A free session. 47 @rtype: qpid.messaging.Session 48 """ 49 self.__lock() 50 try: 51 pool = self.__pool(url) 52 ssn = self.__pop(pool) 53 if ssn is None: 54 broker = Broker(url) 55 con = broker.connect() 56 ssn = con.session() 57 pool[1].append(ssn) 58 return ssn 59 finally: 60 self.__unlock()
61
62 - def put(self, url, ssn):
63 """ 64 Release a session back to the pool. 65 @param url: A broker url. 66 @type url: str 67 @param ssn: An AMQP session. 68 @rtype: qpid.messaging.Session 69 """ 70 self.__lock() 71 try: 72 pool = self.__pool(url) 73 if ssn in pool[1]: 74 pool[1].remove(ssn) 75 if ssn not in pool[0]: 76 pool[0].append(ssn) 77 finally: 78 self.__unlock()
79
80 - def purge(self):
81 """ 82 Purge (close) free sessions. 83 """ 84 self.__lock() 85 try: 86 for pool in self.__pools.values(): 87 while pool[0]: 88 try: 89 ssn = pool.pop() 90 ssn.close() 91 except: 92 log.error(ssn, exc_info=1) 93 finally: 94 self.__unlock()
95
96 - def __pop(self, pool):
97 """ 98 Pop the next available session from the free list. 99 The session is acknowledge to purge it of stale transactions. 100 @param pool: A pool (free,busy). 101 @type pool: tuple 102 @return: The popped session 103 @rtype: qpid.messaging.Session 104 """ 105 while pool[0]: 106 ssn = pool[0].pop() 107 try: 108 ssn.acknowledge() 109 return ssn 110 except: 111 log.error(ssn, exc_info=1)
112
113 - def __pool(self, url):
114 """ 115 Obtain the pool for the specified url. 116 @param url: A broker url. 117 @type url: str 118 @return: The session pool. (free,busy) 119 @rtype: tuple 120 """ 121 self.__lock() 122 try: 123 pool = self.__pools.get(url) 124 if pool is None: 125 pool = ([],[]) 126 self.__pools[url] = pool 127 return pool 128 finally: 129 self.__unlock()
130
131 - def __lock(self):
132 self.__mutex.acquire()
133
134 - def __unlock(self):
135 self.__mutex.release()
136 137
138 -class Endpoint:
139 """ 140 Base class for QPID endpoint. 141 @cvar ssnpool: An AMQP session pool. 142 @type ssnpool: L{SessionPool} 143 @ivar uuid: The unique endpoint id. 144 @type uuid: str 145 @ivar url: The broker URL. 146 @type url: str 147 @ivar __mutex: The endpoint mutex. 148 @type __mutex: RLock 149 @ivar __session: An AMQP session. 150 @type __session: qpid.messaging.Session 151 """ 152 153 LOCALHOST = 'tcp://localhost:5672' 154 155 ssnpool = SessionPool() 156
157 - def __init__(self, uuid=None, url=None):
158 """ 159 @param uuid: The endpoint uuid. 160 @type uuid: str 161 @param url: The broker url <transport>://<user>/<pass>@<host>:<port>. 162 @type url: str 163 """ 164 self.uuid = (uuid or getuuid()) 165 self.url = (url or self.LOCALHOST) 166 self.__mutex = RLock() 167 self.__session = None 168 atexit.register(self.close)
169
170 - def id(self):
171 """ 172 Get the endpoint id 173 @return: The id. 174 @rtype: str 175 """ 176 return self.uuid
177
178 - def session(self):
179 """ 180 Get a session for the open connection. 181 @return: An open session. 182 @rtype: qpid.messaging.Session 183 """ 184 self._lock() 185 try: 186 if self.__session is None: 187 self.__session = self.ssnpool.get(self.url) 188 log.debug('{%s} connected to AMQP' % self.id()) 189 return self.__session 190 finally: 191 self._unlock()
192
193 - def ack(self):
194 """ 195 Acknowledge all messages received on the session. 196 """ 197 try: 198 self.__session.acknowledge(sync=False) 199 except: 200 pass
201
202 - def open(self):
203 """ 204 Open and configure the endpoint. 205 """ 206 pass
207
208 - def close(self):
209 """ 210 Close (shutdown) the endpoint. 211 """ 212 self._lock() 213 try: 214 if self.__session is None: 215 return 216 self.ssnpool.put(self.url, self.__session) 217 self.__session = None 218 finally: 219 self._unlock()
220
221 - def _lock(self):
222 self.__mutex.acquire()
223
224 - def _unlock(self):
225 self.__mutex.release()
226
227 - def __parsedurl(self):
228 urlpart = self.url.split('://', 1) 229 if len(urlpart) == 1: 230 return (urlpart[0], 'tcp') 231 else: 232 return (urlpart[0], urlpart[1])
233
234 - def __del__(self):
235 try: 236 self.close() 237 except: 238 log.error(self.uuid, exc_info=1)
239
240 - def __str__(self):
241 return 'Endpoint id:%s broker @ %s' % (self.id(), self.url)
242