Package gofer :: Module bridge
[hide private]
[frames] | no frames]

Source Code for Module gofer.bridge

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16   
 17  import socket 
 18  from threading import Thread 
 19  from gofer.messaging import getuuid, Queue 
 20  from gofer.messaging.consumer import Consumer, Reader 
 21  from gofer.messaging.producer import Producer, BinaryProducer 
 22  from logging import getLogger 
 23   
 24  log = getLogger(__name__) 
 25   
 26  # 
 27  # Utils 
 28  # 
 29   
30 -def toq(uuid):
31 return Queue(uuid, durable=False)
32 33 34 # 35 # Tunnel Endpoints 36 # TCP->Gateway->Bridge->TCP 37 # 38 39
40 -class Bridge(Consumer):
41 42 HOST = socket.gethostname() 43
44 - def __init__(self, url, uuid, port, host=HOST):
45 Consumer.__init__(self, toq(uuid), url=url) 46 self.uuid = uuid 47 self.port = port 48 self.host = host
49 50
51 - def dispatch(self, env):
52 peer = env.peer 53 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 54 sock.connect((self.host, self.port)) 55 uuid = toq(getuuid()) 56 p = Producer(url=self.url) 57 p.send(env.peer, peer=str(uuid)) 58 reader = Reader(uuid, url=self.url) 59 tr = TunnelReader(reader, sock) 60 tr.start() 61 tw = TunnelWriter(self.url, env.peer, sock) 62 tw.start()
63 64
65 -class Gateway(Thread):
66
67 - def __init__(self, url, peer, port):
68 Thread.__init__(self) 69 self.url = url 70 self.peer = peer 71 self.port = int(port) 72 self.setDaemon(True)
73
74 - def run(self):
75 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 76 sock.bind((socket.gethostname(), self.port)) 77 sock.listen(5) 78 while True: 79 (client, address) = sock.accept() 80 try: 81 self.accepted(client) 82 except Exception: 83 log.exception(address)
84
85 - def accepted(self, sock):
86 uuid = toq(getuuid()) 87 p = Producer(url=self.url) 88 p.send(toq(self.peer), peer=str(uuid)) 89 r = Reader(uuid, url=self.url) 90 env = r.next() 91 tr = TunnelReader(r, sock) 92 tr.start() 93 tw = TunnelWriter(self.url, env.peer, sock) 94 tw.start()
95 96
97 -class TunnelReader(Thread):
98
99 - def __init__(self, reader, sock):
100 Thread.__init__(self) 101 self.reader = reader 102 self.sock = sock 103 self.setDaemon(True)
104
105 - def run(self):
106 try: 107 self.__read() 108 finally: 109 self.reader.close()
110
111 - def __read(self):
112 self.sock.setsockopt( 113 socket.IPPROTO_TCP, 114 socket.TCP_NODELAY, 1) 115 while True: 116 m = self.reader.read(5) 117 if not m: 118 continue 119 if m.content: 120 n = self.__write(m.content) 121 if n == 0: 122 # broken 123 break 124 else: 125 # eof 126 self.close() 127 break
128
129 - def __write(self, content):
130 try: 131 return self.sock.send(content) 132 except: 133 return 0
134
135 - def close(self):
136 try: 137 self.sock.close() 138 except: 139 pass
140 141
142 -class TunnelWriter(Thread):
143 144 BUFSIZE = 0x100000 145
146 - def __init__(self, url, queue, sock):
147 Thread.__init__(self) 148 self.url = url 149 self.sock = sock 150 self.queue = queue 151 self.setDaemon(True)
152
153 - def run(self):
154 producer = BinaryProducer(url=self.url) 155 while True: 156 content = self.__read() 157 if content: 158 producer.send(self.queue, content) 159 else: 160 producer.send(self.queue, '') 161 self.close() 162 break
163
164 - def __read(self):
165 try: 166 return self.sock.recv(self.BUFSIZE) 167 except: 168 pass
169
170 - def close(self):
171 try: 172 self.sock.close() 173 except: 174 pass
175 176 177 # 178 # Testing 179 # 180 181 UUID = 'GATEWAY' 182 URL = 'tcp://localhost:5672' 183 184 if __name__ == '__main__': 185 from logging import basicConfig 186 basicConfig() 187 b = Bridge(URL, UUID, 443) 188 b.start() 189 g = Gateway(URL, UUID, 9443) 190 g.start() 191 g.join() 192